From 34d40980b66a7c62e9ba4a8a5214f90e8484f8a5 Mon Sep 17 00:00:00 2001 From: William Blake Galbreath Date: Fri, 12 Jul 2019 19:20:25 -0500 Subject: [PATCH] Add leaf's Async Chunk IO patch --- .../0052-Asynchronous-chunk-loading-api.patch | 243 +++ .../server/0053-Asynchronous-chunk-IO.patch | 1899 +++++++++++++++++ 2 files changed, 2142 insertions(+) create mode 100644 patches/server/0052-Asynchronous-chunk-loading-api.patch create mode 100644 patches/server/0053-Asynchronous-chunk-IO.patch diff --git a/patches/server/0052-Asynchronous-chunk-loading-api.patch b/patches/server/0052-Asynchronous-chunk-loading-api.patch new file mode 100644 index 000000000..e8da6ff1d --- /dev/null +++ b/patches/server/0052-Asynchronous-chunk-loading-api.patch @@ -0,0 +1,243 @@ +From 440cb45fa9f3b61badc7545a04a0daff24e474f4 Mon Sep 17 00:00:00 2001 +From: Spottedleaf +Date: Mon, 8 Jul 2019 03:24:59 -0700 +Subject: [PATCH] Asynchronous chunk loading api + +--- + .../minecraft/server/ChunkProviderServer.java | 134 ++++++++++++++++++ + .../net/minecraft/server/ChunkStatus.java | 1 + + .../java/net/minecraft/server/MCUtil.java | 5 + + .../java/net/minecraft/server/RegionFile.java | 1 + + .../java/net/minecraft/server/TicketType.java | 1 + + .../org/bukkit/craftbukkit/CraftWorld.java | 19 +-- + 6 files changed, 152 insertions(+), 9 deletions(-) + +diff --git a/src/main/java/net/minecraft/server/ChunkProviderServer.java b/src/main/java/net/minecraft/server/ChunkProviderServer.java +index db9113994e..b46285ecdc 100644 +--- a/src/main/java/net/minecraft/server/ChunkProviderServer.java ++++ b/src/main/java/net/minecraft/server/ChunkProviderServer.java +@@ -147,6 +147,140 @@ public class ChunkProviderServer extends IChunkProvider { + return playerChunk.getAvailableChunkNow(); + + } ++ ++ private long asyncLoadSeqCounter; ++ ++ public void getChunkAtAsynchronously(int x, int z, boolean gen, java.util.function.Consumer onComplete) { ++ if (Thread.currentThread() != this.serverThread) { ++ this.serverThreadQueue.execute(() -> { ++ this.getChunkAtAsynchronously(x, z, gen, onComplete); ++ }); ++ return; ++ } ++ ++ long k = ChunkCoordIntPair.pair(x, z); ++ ChunkCoordIntPair chunkPos = new ChunkCoordIntPair(x, z); ++ ++ IChunkAccess ichunkaccess; ++ ++ // try cache ++ for (int l = 0; l < 4; ++l) { ++ if (k == this.cachePos[l] && ChunkStatus.FULL == this.cacheStatus[l]) { ++ ichunkaccess = this.cacheChunk[l]; ++ if (ichunkaccess != null) { // CraftBukkit - the chunk can become accessible in the meantime TODO for non-null chunks it might also make sense to check that the chunk's state hasn't changed in the meantime ++ ++ // move to first in cache ++ ++ for (int i1 = 3; i1 > 0; --i1) { ++ this.cachePos[i1] = this.cachePos[i1 - 1]; ++ this.cacheStatus[i1] = this.cacheStatus[i1 - 1]; ++ this.cacheChunk[i1] = this.cacheChunk[i1 - 1]; ++ } ++ ++ this.cachePos[0] = k; ++ this.cacheStatus[0] = ChunkStatus.FULL; ++ this.cacheChunk[0] = ichunkaccess; ++ ++ onComplete.accept((Chunk)ichunkaccess); ++ ++ return; ++ } ++ } ++ } ++ ++ if (gen) { ++ this.bringToFullStatusAsync(x, z, chunkPos, onComplete); ++ return; ++ } ++ ++ IChunkAccess current = this.getChunkAtImmediately(x, z); // we want to bypass ticket restrictions ++ if (current != null) { ++ if (!(current instanceof ProtoChunkExtension) && !(current instanceof net.minecraft.server.Chunk)) { ++ onComplete.accept(null); // the chunk is not gen'd ++ return; ++ } ++ // we know the chunk is at full status here (either in read-only mode or the real thing) ++ this.bringToFullStatusAsync(x, z, chunkPos, onComplete); ++ return; ++ } else { ++ RegionFile file; ++ ++ try { ++ file = this.world.getChunkProvider().playerChunkMap.getRegionFile(chunkPos, false); ++ } catch (IOException ex) { ++ throw new RuntimeException(ex); ++ } ++ ++ ChunkStatus status; ++ if (!file.chunkExists(chunkPos) || ((status = file.getStatusIfCached(x, z)) != null && status != ChunkStatus.FULL)) { ++ onComplete.accept(null); // cached status says not generated, or data does not exist on disk ++ return; ++ } ++ ++ ++ if (status == ChunkStatus.FULL) { ++ // at this stage we know it is fully generated, but is on disk ++ this.bringToFullStatusAsync(x, z, chunkPos, onComplete); ++ return; ++ } ++ ++ // at this stage we don't know what status the chunk is in ++ } ++ ++ // here we don't know what status it is and we're not supposed to generate ++ // so we asynchronously load empty status ++ ++ this.bringToStatusAsync(x, z, chunkPos, ChunkStatus.EMPTY, (IChunkAccess chunk) -> { ++ if (!(chunk instanceof ProtoChunkExtension) && !(chunk instanceof net.minecraft.server.Chunk)) { ++ // the chunk on disk was not a full status chunk ++ onComplete.accept(null); ++ return; ++ } ++ this.bringToFullStatusAsync(x, z, chunkPos, onComplete); // bring to full status if required ++ }); ++ } ++ ++ private void bringToFullStatusAsync(int x, int z, ChunkCoordIntPair chunkPos, java.util.function.Consumer onComplete) { ++ this.bringToStatusAsync(x, z, chunkPos, ChunkStatus.FULL, (java.util.function.Consumer)onComplete); ++ } ++ ++ ++ private void bringToStatusAsync(int x, int z, ChunkCoordIntPair chunkPos, ChunkStatus status, java.util.function.Consumer onComplete) { ++ CompletableFuture> future = this.getChunkFutureMainThread(x, z, status, true); ++ long identifier = this.asyncLoadSeqCounter++; ++ int ticketLevel = MCUtil.getTicketLevelFor(status); ++ this.addTicketAtLevel(TicketType.ASYNC_LOAD, chunkPos, ticketLevel, identifier); ++ ++ future.whenCompleteAsync((Either either, Throwable throwable) -> { ++ // either left -> success ++ // either right -> failure ++ ++ if (throwable != null) { ++ throw new RuntimeException(throwable); ++ } ++ ++ this.removeTicketAtLevel(TicketType.ASYNC_LOAD, chunkPos, ticketLevel, identifier); ++ this.addTicketAtLevel(TicketType.UNKNOWN, chunkPos, ticketLevel, chunkPos); // allow unloading ++ ++ Optional failure = either.right(); ++ ++ if (failure.isPresent()) { ++ // failure ++ throw new IllegalStateException("Chunk failed to load: " + failure.get().toString()); ++ } ++ ++ onComplete.accept(either.left().get()); ++ ++ }, this.serverThreadQueue); ++ } ++ ++ public void addTicketAtLevel(TicketType ticketType, ChunkCoordIntPair chunkPos, int ticketLevel, T identifier) { ++ this.chunkMapDistance.addTicketAtLevel(ticketType, chunkPos, ticketLevel, identifier); ++ } ++ ++ public void removeTicketAtLevel(TicketType ticketType, ChunkCoordIntPair chunkPos, int ticketLevel, T identifier) { ++ this.chunkMapDistance.removeTicketAtLevel(ticketType, chunkPos, ticketLevel, identifier); ++ } + // Paper end + + @Nullable +diff --git a/src/main/java/net/minecraft/server/ChunkStatus.java b/src/main/java/net/minecraft/server/ChunkStatus.java +index e324989b46..abb0d69d2f 100644 +--- a/src/main/java/net/minecraft/server/ChunkStatus.java ++++ b/src/main/java/net/minecraft/server/ChunkStatus.java +@@ -153,6 +153,7 @@ public class ChunkStatus { + return ChunkStatus.q.size(); + } + ++ public static int getTicketLevelOffset(ChunkStatus status) { return ChunkStatus.a(status); } // Paper - OBFHELPER + public static int a(ChunkStatus chunkstatus) { + return ChunkStatus.r.getInt(chunkstatus.c()); + } +diff --git a/src/main/java/net/minecraft/server/MCUtil.java b/src/main/java/net/minecraft/server/MCUtil.java +index 23d1935dd5..14f8b61042 100644 +--- a/src/main/java/net/minecraft/server/MCUtil.java ++++ b/src/main/java/net/minecraft/server/MCUtil.java +@@ -530,4 +530,9 @@ public final class MCUtil { + out.print(fileData); + } + } ++ ++ public static int getTicketLevelFor(ChunkStatus status) { ++ // TODO make sure the constant `33` is correct on future updates. See getChunkAt(int, int, ChunkStatus, boolean) ++ return 33 + ChunkStatus.getTicketLevelOffset(status); ++ } + } +diff --git a/src/main/java/net/minecraft/server/RegionFile.java b/src/main/java/net/minecraft/server/RegionFile.java +index d610253b95..18f218e971 100644 +--- a/src/main/java/net/minecraft/server/RegionFile.java ++++ b/src/main/java/net/minecraft/server/RegionFile.java +@@ -327,6 +327,7 @@ public class RegionFile implements AutoCloseable { + return this.c[this.f(chunkcoordintpair)]; + } + ++ public boolean chunkExists(ChunkCoordIntPair chunkPos) { return this.d(chunkPos); } // Paper - OBFHELPER + public boolean d(ChunkCoordIntPair chunkcoordintpair) { + return this.getOffset(chunkcoordintpair) != 0; + } +diff --git a/src/main/java/net/minecraft/server/TicketType.java b/src/main/java/net/minecraft/server/TicketType.java +index 5acb0732c3..0ed2d2fbf9 100644 +--- a/src/main/java/net/minecraft/server/TicketType.java ++++ b/src/main/java/net/minecraft/server/TicketType.java +@@ -22,6 +22,7 @@ public class TicketType { + public static final TicketType PLUGIN = a("plugin", (a, b) -> 0); // CraftBukkit + public static final TicketType PLUGIN_TICKET = a("plugin_ticket", (plugin1, plugin2) -> plugin1.getClass().getName().compareTo(plugin2.getClass().getName())); // Craftbukkit + public static final TicketType ANTIXRAY = a("antixray", Integer::compareTo); // Paper - Anti-Xray ++ public static final TicketType ASYNC_LOAD = a("async_load", Long::compareTo); // Paper + + public static TicketType a(String s, Comparator comparator) { + return new TicketType<>(s, comparator, 0L); +diff --git a/src/main/java/org/bukkit/craftbukkit/CraftWorld.java b/src/main/java/org/bukkit/craftbukkit/CraftWorld.java +index 85b5c2bff0..aabb2ae26c 100644 +--- a/src/main/java/org/bukkit/craftbukkit/CraftWorld.java ++++ b/src/main/java/org/bukkit/craftbukkit/CraftWorld.java +@@ -2283,16 +2283,17 @@ public class CraftWorld implements World { + + @Override + public CompletableFuture getChunkAtAsync(int x, int z, boolean gen) { +- // TODO placeholder +- if (Bukkit.isPrimaryThread()) { +- return CompletableFuture.completedFuture(getChunkAtGen(x, z, gen)); +- } else { +- CompletableFuture ret = new CompletableFuture<>(); +- net.minecraft.server.MinecraftServer.getServer().scheduleOnMain(() -> { +- ret.complete(getChunkAtGen(x, z, gen)); +- }); +- return ret; ++ net.minecraft.server.Chunk immediate = this.world.getChunkProvider().getChunkAtIfLoadedImmediately(x, z); ++ if (immediate != null) { ++ return CompletableFuture.completedFuture(immediate.bukkitChunk); + } ++ ++ CompletableFuture ret = new CompletableFuture<>(); ++ this.world.getChunkProvider().getChunkAtAsynchronously(x, z, gen, (net.minecraft.server.Chunk chunk) -> { ++ ret.complete(chunk == null ? null : chunk.bukkitChunk); ++ }); ++ ++ return ret; + } + // Paper end + +-- +2.20.1 + diff --git a/patches/server/0053-Asynchronous-chunk-IO.patch b/patches/server/0053-Asynchronous-chunk-IO.patch new file mode 100644 index 000000000..4d23e1570 --- /dev/null +++ b/patches/server/0053-Asynchronous-chunk-IO.patch @@ -0,0 +1,1899 @@ +From c2a26437cd05c2c5023a3e609ccd6dfd25757f69 Mon Sep 17 00:00:00 2001 +From: Spottedleaf +Date: Tue, 9 Jul 2019 03:38:23 -0700 +Subject: [PATCH] Asynchronous chunk IO + +--- + .../paper/io/ConcreteFileIOThread.java | 652 ++++++++++++++++++ + .../com/destroystokyo/paper/io/IOUtil.java | 62 ++ + .../paper/io/PrioritizedTaskQueue.java | 262 +++++++ + .../paper/io/QueueExecutorThread.java | 200 ++++++ + .../minecraft/server/ChunkProviderServer.java | 5 + + .../net/minecraft/server/MinecraftServer.java | 1 + + .../net/minecraft/server/PlayerChunkMap.java | 152 +++- + .../java/net/minecraft/server/RegionFile.java | 17 +- + .../net/minecraft/server/RegionFileCache.java | 6 +- + .../minecraft/server/RegionFileSection.java | 57 +- + .../net/minecraft/server/VillagePlace.java | 64 +- + .../net/minecraft/server/WorldServer.java | 72 ++ + 12 files changed, 1498 insertions(+), 52 deletions(-) + create mode 100644 src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java + create mode 100644 src/main/java/com/destroystokyo/paper/io/IOUtil.java + create mode 100644 src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java + create mode 100644 src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java + +diff --git a/src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java b/src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java +new file mode 100644 +index 0000000000..13194d1978 +--- /dev/null ++++ b/src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java +@@ -0,0 +1,652 @@ ++package com.destroystokyo.paper.io; ++ ++import net.minecraft.server.ChunkCoordIntPair; ++import net.minecraft.server.ExceptionWorldConflict; ++import net.minecraft.server.MinecraftServer; ++import net.minecraft.server.NBTTagCompound; ++import net.minecraft.server.RegionFile; ++import net.minecraft.server.WorldServer; ++import org.apache.logging.log4j.Logger; ++ ++import java.io.IOException; ++import java.util.concurrent.CompletableFuture; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.atomic.AtomicLong; ++import java.util.function.Consumer; ++import java.util.function.Function; ++ ++/** ++ * Prioritized singleton thread responsible for all chunk IO that occurs in a minecraft server. ++ * ++ *

++ * Singleton access: {@link Holder#INSTANCE} ++ *

++ * ++ *

++ * All functions provided are MT-Safe, however certain ordering constraints are (but not enforced): ++ *

  • ++ * Chunk saves may not occur for unloaded chunks. ++ *
  • ++ *
  • ++ * Tasks must be scheduled on the main thread. ++ *
  • ++ *

    ++ * ++ * @see Holder#INSTANCE ++ * @see #scheduleSave(WorldServer, int, int, NBTTagCompound, NBTTagCompound, int) ++ * @see #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean) ++ */ ++public final class ConcreteFileIOThread extends QueueExecutorThread { ++ ++ private static final Logger LOGGER = MinecraftServer.LOGGER; ++ public static final NBTTagCompound FAILURE_VALUE = new NBTTagCompound(); ++ ++ public static final class Holder { ++ ++ public static final ConcreteFileIOThread INSTANCE = new ConcreteFileIOThread(); ++ ++ static { ++ INSTANCE.start(); ++ } ++ } ++ ++ private final AtomicLong writeCounter = new AtomicLong(); ++ ++ private ConcreteFileIOThread() { ++ super(new PrioritizedTaskQueue<>(), (int) (1.0e6)); // 1.0ms spinwait time ++ this.setName("Concrete RegionFile IO Thread"); ++ this.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us ++ this.setUncaughtExceptionHandler((final Thread unused, final Throwable thr) -> { ++ LOGGER.fatal("Uncaught exception thrown from IO thread, report this!", thr); ++ }); ++ } ++ ++ /* run() is implemented by superclass */ ++ ++ /* ++ * ++ * IO thread will perform reads before writes ++ * ++ * How reads/writes are scheduled: ++ * ++ * If read in progress while scheduling write, ignore read and schedule write ++ * If read in progress while scheduling read (no write in progress), chain the read task ++ * ++ * ++ * If write in progress while scheduling read, use the pending write data and ret immediately ++ * If write in progress while scheduling write (ignore read in progress), overwrite the write in progress data ++ * ++ * This allows the reads and writes to act as if they occur synchronously to the thread scheduling them, however ++ * it fails to properly propagate write failures ++ * ++ */ ++ ++ /** ++ * Attempts to bump the priority of all IO tasks for the given chunk coordinates. This has no effect if no tasks are queued. ++ * ++ * @param world Chunk's world ++ * @param chunkX Chunk's x coordinate ++ * @param chunkZ Chunk's z coordinate ++ * @param priority Priority level to try to bump to ++ */ ++ public void bumpPriority(final WorldServer world, final int chunkX, final int chunkZ, final int priority) { ++ if (!PrioritizedTaskQueue.validPriority(priority)) { ++ throw new IllegalArgumentException("Invalid priority: " + priority); ++ } ++ ++ final Long key = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)); ++ ++ final ChunkDataTask poiTask = world.poiDataController.tasks.get(key); ++ final ChunkDataTask chunkTask = world.chunkDataController.tasks.get(key); ++ ++ if (poiTask != null) { ++ poiTask.raisePriority(priority); ++ } ++ if (chunkTask != null) { ++ chunkTask.raisePriority(priority); ++ } ++ } ++ ++ // Hack start ++ ++ /** ++ * if {@code waitForRead} is true, then this task will wait on an available read task, else it will wait on an available ++ * write task ++ * if {@code poiTask} is true, then this task will wait on a poi task, else it will wait on chunk data task ++ * ++ * @return whether the task succeeded, or {@code null} if there is no task ++ * @deprecated API is garbage and will only work for main thread queueing of tasks (which is vanilla), plugins messing ++ * around asynchronously will give unexpected results ++ */ ++ @Deprecated ++ public Boolean waitForIOToComplete(final WorldServer world, final int chunkX, final int chunkZ, boolean waitForRead, boolean poiTask) { ++ final ChunkDataTask task; ++ ++ final Long key = IOUtil.getCoordinateKey(chunkX, chunkZ); ++ if (poiTask) { ++ task = world.poiDataController.tasks.get(key); ++ } else { ++ task = world.chunkDataController.tasks.get(key); ++ } ++ ++ if (task == null) { ++ return null; ++ } ++ ++ if (waitForRead) { ++ ChunkDataController.InProgressRead read = task.inProgressRead; ++ if (read == null) { ++ return null; ++ } ++ return Boolean.valueOf(read.readFuture.join() != null); ++ } ++ ++ // wait for write ++ ChunkDataController.InProgressWrite write = task.inProgressWrite; ++ if (write == null) { ++ return null; ++ } ++ return Boolean.valueOf(write.wrote.join() != null); ++ } ++ // Hack end ++ ++ /** ++ * Sets the priority of all IO tasks for the given chunk coordinates. This has no effect if no tasks are queued. ++ * ++ * @param world Chunk's world ++ * @param chunkX Chunk's x coordinate ++ * @param chunkZ Chunk's z coordinate ++ * @param priority Priority level to set to ++ */ ++ public void setPriority(final WorldServer world, final int chunkX, final int chunkZ, final int priority) { ++ if (!PrioritizedTaskQueue.validPriority(priority)) { ++ throw new IllegalArgumentException("Invalid priority: " + priority); ++ } ++ ++ final Long key = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)); ++ ++ final ChunkDataTask poiTask = world.poiDataController.tasks.get(key); ++ final ChunkDataTask chunkTask = world.chunkDataController.tasks.get(key); ++ ++ if (poiTask != null) { ++ poiTask.updatePriority(priority); ++ } ++ if (chunkTask != null) { ++ chunkTask.updatePriority(priority); ++ } ++ } ++ ++ /** ++ * Schedules the chunk data to be written asynchronously. ++ *

    ++ * Impl notes: ++ *

    ++ *
  • ++ * This function presumes a chunk load for the coordinates is not called during this function (anytime after is OK). This means ++ * saves must be scheduled before a chunk is unloaded. ++ *
  • ++ *
  • ++ * Writes may be called concurrently, although only the "later" write will go through. ++ *
  • ++ * ++ * @param world Chunk's world ++ * @param chunkX Chunk's x coordinate ++ * @param chunkZ Chunk's z coordinate ++ * @param poiData Chunk point of interest data. If {@code null}, then no poi data is saved. ++ * @param chunkData Chunk data. If {@code null}, then no chunk data is saved. ++ * @param priority Priority level for this task. See {@link PrioritizedTaskQueue} ++ * @throws IllegalArgumentException If both {@code poiData} and {@code chunkData} are {@code null}. ++ * @throws IllegalStateException If the file io thread has shutdown. ++ */ ++ public void scheduleSave(final WorldServer world, final int chunkX, final int chunkZ, ++ final NBTTagCompound poiData, final NBTTagCompound chunkData, ++ final int priority) throws IllegalArgumentException { ++ if (!PrioritizedTaskQueue.validPriority(priority)) { ++ throw new IllegalArgumentException("Invalid priority: " + priority); ++ } ++ ++ final long writeCounter = this.writeCounter.getAndIncrement(); ++ ++ if (poiData != null) { ++ this.scheduleWrite(world.poiDataController, world, chunkX, chunkZ, poiData, priority, writeCounter); ++ } ++ if (chunkData != null) { ++ this.scheduleWrite(world.chunkDataController, world, chunkX, chunkZ, chunkData, priority, writeCounter); ++ } ++ } ++ ++ private void scheduleWrite(final ChunkDataController dataController, final WorldServer world, ++ final int chunkX, final int chunkZ, final NBTTagCompound data, final int priority, final long writeCounter) { ++ dataController.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkDataTask taskRunning) -> { ++ if (taskRunning == null) { ++ // no task is scheduled ++ ++ // create task ++ final ChunkDataTask newTask = new ChunkDataTask(priority, world, chunkX, chunkZ, dataController); ++ newTask.inProgressWrite = new ChunkDataController.InProgressWrite(); ++ newTask.inProgressWrite.writeCounter = writeCounter; ++ newTask.inProgressWrite.data = data; ++ ++ ConcreteFileIOThread.this.queueTask(newTask); // schedule ++ return newTask; ++ } ++ ++ taskRunning.raisePriority(priority); ++ ++ if (taskRunning.inProgressWrite == null) { ++ taskRunning.inProgressWrite = new ChunkDataController.InProgressWrite(); ++ } ++ ++ boolean reschedule = taskRunning.inProgressWrite.writeCounter == -1L; ++ ++ // synchronize for readers ++ //noinspection SynchronizationOnLocalVariableOrMethodParameter ++ synchronized (taskRunning) { ++ taskRunning.inProgressWrite.data = data; ++ taskRunning.inProgressWrite.writeCounter = writeCounter; ++ } ++ ++ if (reschedule) { ++ // We need to reschedule this task since the previous one is not currently scheduled since it failed ++ taskRunning.reschedule(priority); ++ } ++ ++ return taskRunning; ++ }); ++ } ++ ++ /** ++ * Same as {@link #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean)}, except this function returns ++ * a {@link CompletableFuture} which is potentially completed ASYNCHRONOUSLY ON THE FILE IO THREAD when the load task ++ * has completed. ++ *

    ++ * Note that if the chunk fails to load the returned future is completed with {@code null}. ++ *

    ++ */ ++ public CompletableFuture loadChunkDataAsyncFuture(final WorldServer world, final int chunkX, final int chunkZ, ++ final int priority, final boolean readPoiData, final boolean readChunkData, ++ final boolean intendingToBlock) { ++ final CompletableFuture future = new CompletableFuture<>(); ++ this.loadChunkDataAsync(world, chunkX, chunkZ, priority, future::complete, readPoiData, readChunkData, intendingToBlock); ++ return future; ++ } ++ ++ /** ++ * Schedules a load to be executed asynchronously. ++ *

    ++ * Impl notes: ++ *

    ++ *
  • ++ * If a chunk fails to load, the {@code onComplete} parameter is completed with {@code null}. ++ *
  • ++ *
  • ++ * It is possible for the {@code onComplete} parameter to be given {@link ChunkData} containing data ++ * this call did not request. ++ *
  • ++ *
  • ++ * The {@code onComplete} parameter may be completed during the execution of this function synchronously or it may ++ * be completed asynchronously on this file io thread. Interacting with the file IO thread in the completion of ++ * data is undefined behaviour, and can cause deadlock. ++ *
  • ++ * ++ * @param world Chunk's world ++ * @param chunkX Chunk's x coordinate ++ * @param chunkZ Chunk's z coordinate ++ * @param priority Priority level for this task. See {@link PrioritizedTaskQueue} ++ * @param onComplete Consumer to execute once this task has completed ++ * @param readPoiData Whether to read point of interest data. If {@code false}, the {@code NBTTagCompound} will be {@code null}. ++ * @param readChunkData Whether to read chunk data. If {@code false}, the {@code NBTTagCompound} will be {@code null}. ++ * @return The {@link PrioritizedTaskQueue.PrioritizedTask} associated with this task. Note that this task does not support ++ * cancellation. ++ */ ++ public void loadChunkDataAsync(final WorldServer world, final int chunkX, final int chunkZ, ++ final int priority, final Consumer onComplete, ++ final boolean readPoiData, final boolean readChunkData, ++ final boolean intendingToBlock) { ++ if (!PrioritizedTaskQueue.validPriority(priority)) { ++ throw new IllegalArgumentException("Invalid priority: " + priority); ++ } ++ ++ if (!(readPoiData | readChunkData)) { ++ throw new IllegalArgumentException("Must read chunk data or poi data"); ++ } ++ ++ final ChunkData complete = new ChunkData(); ++ final boolean[] requireCompletion = new boolean[]{readPoiData, readChunkData}; ++ ++ if (readPoiData) { ++ this.scheduleRead(world.poiDataController, world, chunkX, chunkZ, (final NBTTagCompound poiData) -> { ++ complete.poiData = poiData; ++ ++ final boolean finished; ++ ++ // avoid a race condition where the file io thread completes and we complete synchronously ++ // Note: Synchronization can be elided if both of the accesses are volatile ++ synchronized (requireCompletion) { ++ requireCompletion[0] = false; // 0 -> poi data ++ finished = !requireCompletion[1]; // 1 -> chunk data ++ } ++ ++ if (finished) { ++ onComplete.accept(complete); ++ } ++ }, priority, intendingToBlock); ++ } ++ ++ if (readChunkData) { ++ this.scheduleRead(world.chunkDataController, world, chunkX, chunkZ, (final NBTTagCompound chunkData) -> { ++ complete.chunkData = chunkData; ++ ++ final boolean finished; ++ ++ // avoid a race condition where the file io thread completes and we complete synchronously ++ // Note: Synchronization can be elided if both of the accesses are volatile ++ synchronized (requireCompletion) { ++ requireCompletion[1] = false; // 1 -> chunk data ++ finished = !requireCompletion[0]; // 0 -> poi data ++ } ++ ++ if (finished) { ++ onComplete.accept(complete); ++ } ++ }, priority, intendingToBlock); ++ } ++ ++ } ++ ++ // Note: the onComplete may be called asynchronously or synchronously here. ++ private void scheduleRead(final ChunkDataController dataController, final WorldServer world, ++ final int chunkX, final int chunkZ, final Consumer onComplete, final int priority, ++ final boolean intendingToBlock) { ++ ++ Function tryLoadFunction = (final RegionFile file) -> { ++ if (file == null) { ++ return Boolean.TRUE; ++ } ++ return Boolean.valueOf(file.chunkExists(new ChunkCoordIntPair(chunkX, chunkZ))); ++ }; ++ ++ dataController.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkDataTask running) -> { ++ if (running == null) { ++ // not scheduled ++ ++ final Boolean shouldSchedule = intendingToBlock ? dataController.computeForRegionFile(chunkX, chunkZ, tryLoadFunction) : ++ dataController.computeForRegionFileIfLoaded(chunkX, chunkZ, tryLoadFunction); ++ ++ if (shouldSchedule == Boolean.FALSE) { ++ // not on disk ++ onComplete.accept(null); ++ return null; ++ } ++ ++ // set up task ++ final ChunkDataTask newTask = new ChunkDataTask(priority, world, chunkX, chunkZ, dataController); ++ newTask.inProgressRead = new ChunkDataController.InProgressRead(); ++ newTask.inProgressRead.readFuture.thenAccept(onComplete); ++ ++ ConcreteFileIOThread.this.queueTask(newTask); // schedule task ++ return newTask; ++ } ++ ++ running.raisePriority(priority); ++ ++ if (running.inProgressWrite == null) { ++ // chain to the read future ++ running.inProgressRead.readFuture.thenAccept(onComplete); ++ return running; ++ } ++ ++ // at this stage we have to use the in progress write's data to avoid an order issue ++ // we don't synchronize since all writes to data occur in the compute() call ++ onComplete.accept(running.inProgressWrite.data); ++ return running; ++ }); ++ } ++ ++ /** ++ * Same as {@link #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean)}, except this function returns ++ * the {@link ChunkData} associated with the specified chunk when the task is complete. ++ * ++ * @return The chunk data, or {@code null} if the chunk failed to load. ++ */ ++ public ChunkData loadChunkData(final WorldServer world, final int chunkX, final int chunkZ, final int priority, ++ final boolean readPoiData, final boolean readChunkData) { ++ return this.loadChunkDataAsyncFuture(world, chunkX, chunkZ, priority, readPoiData, readChunkData, true).join(); ++ } ++ ++ /** ++ * Schedules the given task at the specified priority to be executed on the IO thread. ++ *

    ++ * Internal api. Do not use. ++ *

    ++ */ ++ public void runTask(final int priority, final Runnable runnable) { ++ this.queueTask(new GeneralTask(priority, runnable)); ++ } ++ ++ static final class GeneralTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable { ++ ++ private final Runnable run; ++ ++ public GeneralTask(final int priority, final Runnable run) { ++ super(priority); ++ this.run = IOUtil.notNull(run, "Task may not be null"); ++ } ++ ++ @Override ++ public void run() { ++ try { ++ this.run.run(); ++ } catch (final Throwable throwable) { ++ if (throwable instanceof ThreadDeath) { ++ throw (ThreadDeath) throwable; ++ } ++ LOGGER.fatal("Failed to execute general task on IO thread " + IOUtil.genericToString(this.run), throwable); ++ } ++ } ++ } ++ ++ public static final class ChunkData { ++ ++ public NBTTagCompound poiData; ++ public NBTTagCompound chunkData; ++ ++ public ChunkData() { ++ } ++ ++ public ChunkData(final NBTTagCompound poiData, final NBTTagCompound chunkData) { ++ this.poiData = poiData; ++ this.chunkData = chunkData; ++ } ++ } ++ ++ public static abstract class ChunkDataController { ++ ++ // ConcurrentHashMap synchronizes per chain, so reduce the chance of task's hashes colliding. ++ public final ConcurrentHashMap tasks = new ConcurrentHashMap<>(64, 0.5f); ++ ++ public abstract void writeData(final int x, final int z, final NBTTagCompound compound) throws IOException; ++ ++ public abstract NBTTagCompound readData(final int x, final int z) throws IOException; ++ ++ public abstract T computeForRegionFile(final int chunkX, final int chunkZ, final Function function); ++ ++ public abstract T computeForRegionFileIfLoaded(final int chunkX, final int chunkZ, final Function function); ++ ++ public static final class InProgressWrite { ++ public long writeCounter; ++ public NBTTagCompound data; ++ ++ // Hack start ++ @Deprecated ++ public CompletableFuture wrote = new CompletableFuture<>(); ++ // Hack end ++ } ++ ++ public static final class InProgressRead { ++ public final CompletableFuture readFuture = new CompletableFuture<>(); ++ } ++ } ++ ++ public static final class ChunkDataTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable { ++ ++ public ChunkDataController.InProgressWrite inProgressWrite; ++ public ChunkDataController.InProgressRead inProgressRead; ++ ++ private final WorldServer world; ++ private final int x; ++ private final int z; ++ private final ChunkDataController taskController; ++ ++ public ChunkDataTask(final int priority, final WorldServer world, final int x, final int z, final ChunkDataController taskController) { ++ super(priority); ++ this.world = world; ++ this.x = x; ++ this.z = z; ++ this.taskController = taskController; ++ } ++ ++ @Override ++ public String toString() { ++ return "Task for world " + this.world.getWorld().getName() + " at " + this.x + ", " + this.z + " poi: " + (this.taskController == this.world.poiDataController); ++ } ++ ++ /* ++ * ++ * IO thread will perform reads before writes ++ * ++ * How reads/writes are scheduled: ++ * ++ * If read in progress while scheduling write, ignore read and schedule write ++ * If read in progress while scheduling read (no write in progress), chain the read task ++ * ++ * ++ * If write in progress while scheduling read, use the pending write data and ret immediately ++ * If write in progress while scheduling write (ignore read in progress), overwrite the write in progress data ++ * ++ * This allows the reads and writes to act as if they occur synchronously to the thread scheduling them, however ++ * it fails to properly propagate write failures ++ * ++ */ ++ ++ void reschedule(final int priority) { ++ // priority is checked before this stage ++ this.queue.lazySet(null); ++ this.inProgressWrite.wrote = new CompletableFuture<>(); // Hack ++ this.priority.lazySet(priority); ++ ConcreteFileIOThread.Holder.INSTANCE.queueTask(this); ++ } ++ ++ @Override ++ public void run() { ++ ChunkDataController.InProgressRead read = this.inProgressRead; ++ if (read != null) { ++ NBTTagCompound compound = ConcreteFileIOThread.FAILURE_VALUE; ++ try { ++ compound = this.taskController.readData(this.x, this.z); ++ } catch (final Throwable thr) { ++ if (thr instanceof ThreadDeath) { ++ throw (ThreadDeath) thr; ++ } ++ LOGGER.fatal("Failed to read chunk data for (" + this.x + "," + this.z + ")", thr); ++ // fall through to complete with null data ++ } ++ read.readFuture.complete(compound); ++ } ++ ++ final Long chunkKey = Long.valueOf(IOUtil.getCoordinateKey(this.x, this.z)); ++ ++ ChunkDataController.InProgressWrite write = this.inProgressWrite; ++ ++ if (write == null) { ++ // IntelliJ warns this is invalid, however it does not consider that writes to the task map & the inProgress field can occur concurrently. ++ ChunkDataTask inMap = this.taskController.tasks.compute(chunkKey, (final Long keyInMap, final ChunkDataTask valueInMap) -> { ++ if (valueInMap == null) { ++ throw new IllegalStateException("Write completed concurrently, report this"); ++ } ++ if (valueInMap != ChunkDataTask.this) { ++ throw new IllegalStateException("Chunk task mismatch, report this"); ++ } ++ return valueInMap.inProgressWrite == null ? null : valueInMap; ++ }); ++ ++ if (inMap == null) { ++ return; // set the task value to null, indicating we're done ++ } ++ ++ // not null, which means there was a concurrent write ++ write = this.inProgressWrite; ++ } ++ ++ // check if another process is writing ++ try { ++ this.world.checkSession(); ++ } catch (final ExceptionWorldConflict ex) { ++ LOGGER.fatal("Couldn't save chunk; already in use by another instance of Minecraft?", ex); ++ // we don't need to set the write counter to -1 as we know at this stage there's no point in re-scheduling ++ // writes since they'll fail anyways. ++ write.wrote.complete(ConcreteFileIOThread.FAILURE_VALUE); // Hack - However we need to fail the write ++ return; ++ } ++ ++ for (; ; ) { ++ final long writeCounter; ++ final NBTTagCompound data; ++ ++ //noinspection SynchronizationOnLocalVariableOrMethodParameter ++ synchronized (write) { ++ writeCounter = write.writeCounter; ++ data = write.data; ++ } ++ ++ boolean failedWrite = false; ++ ++ try { ++ this.taskController.writeData(this.x, this.z, data); ++ } catch (final Throwable thr) { ++ if (thr instanceof ThreadDeath) { ++ throw (ThreadDeath) thr; ++ } ++ LOGGER.fatal("Failed to write chunk data for (" + this.x + "," + this.z + ")", thr); ++ failedWrite = true; ++ } ++ ++ boolean finalFailWrite = failedWrite; ++ boolean[] returnFailWrite = new boolean[]{false}; ++ ++ ChunkDataTask inMap = this.taskController.tasks.compute(chunkKey, (final Long keyInMap, final ChunkDataTask valueInMap) -> { ++ if (valueInMap == null) { ++ throw new IllegalStateException("Write completed concurrently, report this"); ++ } ++ if (valueInMap != ChunkDataTask.this) { ++ throw new IllegalStateException("Chunk task mismatch, report this"); ++ } ++ if (finalFailWrite) { ++ if (valueInMap.inProgressWrite.writeCounter == writeCounter) { ++ valueInMap.inProgressWrite.writeCounter = -1L; ++ returnFailWrite[0] = true; ++ } ++ // Hack start ++ valueInMap.inProgressWrite.wrote.complete(ConcreteFileIOThread.FAILURE_VALUE); ++ return valueInMap; ++ } ++ if (valueInMap.inProgressWrite.writeCounter == writeCounter) { ++ valueInMap.inProgressWrite.wrote.complete(data); ++ return null; ++ } ++ return valueInMap; ++ // Hack end ++ }); ++ ++ if (inMap == null || returnFailWrite[0]) { ++ // write counter matched, so we wrote the most up-to-date pending data, we're done here ++ // or we failed to write and successfully set the write counter to -1 ++ return; // we're done here ++ } ++ ++ // fetch & write new data ++ continue; ++ } ++ } ++ } ++} +diff --git a/src/main/java/com/destroystokyo/paper/io/IOUtil.java b/src/main/java/com/destroystokyo/paper/io/IOUtil.java +new file mode 100644 +index 0000000000..7898cab62f +--- /dev/null ++++ b/src/main/java/com/destroystokyo/paper/io/IOUtil.java +@@ -0,0 +1,62 @@ ++package com.destroystokyo.paper.io; ++ ++import org.bukkit.Bukkit; ++ ++public final class IOUtil { ++ ++ /* Copied from concrete or concurrentutil */ ++ ++ public static long getCoordinateKey(final int x, final int z) { ++ return ((long) z << 32) | (x & 0xFFFFFFFFL); ++ } ++ ++ public static int getCoordinateX(final long key) { ++ return (int) key; ++ } ++ ++ public static int getCoordinateZ(final long key) { ++ return (int) (key >>> 32); ++ } ++ ++ public static int getRegionCoordinate(final int chunkCoordinate) { ++ return chunkCoordinate >> 5; ++ } ++ ++ public static int getChunkInRegion(final int chunkCoordinate) { ++ return chunkCoordinate & 31; ++ } ++ ++ public static String genericToString(final Object object) { ++ return object == null ? "null" : object.getClass().getName() + ":" + object.toString(); ++ } ++ ++ public static T notNull(final T obj) { ++ if (obj == null) { ++ throw new NullPointerException(); ++ } ++ return obj; ++ } ++ ++ public static T notNull(final T obj, final String msgIfNull) { ++ if (obj == null) { ++ throw new NullPointerException(msgIfNull); ++ } ++ return obj; ++ } ++ ++ public static void arrayBounds(final int off, final int len, final int arrayLength, final String msgPrefix) { ++ if (off < 0 || len < 0 || (arrayLength - off) < len) { ++ throw new ArrayIndexOutOfBoundsException(msgPrefix + ": off: " + off + ", len: " + len + ", array length: " + arrayLength); ++ } ++ } ++ ++ public static int getPriorityForCurrentThread() { ++ return Bukkit.isPrimaryThread() ? PrioritizedTaskQueue.HIGHEST_PRIORITY : PrioritizedTaskQueue.NORMAL_PRIORITY; ++ } ++ ++ @SuppressWarnings("unchecked") ++ public static void rethrow(final Throwable throwable) throws T { ++ throw (T) throwable; ++ } ++ ++} +\ No newline at end of file +diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java +new file mode 100644 +index 0000000000..3722991e1b +--- /dev/null ++++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java +@@ -0,0 +1,262 @@ ++package com.destroystokyo.paper.io; ++ ++import java.util.concurrent.ConcurrentLinkedQueue; ++import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.concurrent.atomic.AtomicInteger; ++import java.util.concurrent.atomic.AtomicReference; ++ ++public class PrioritizedTaskQueue { ++ ++ // lower numbers are a higher priority (except < 0) ++ // higher priorities are always executed before lower priorities ++ ++ /** ++ * Priority value indicating the task has completed or is being completed. ++ */ ++ public static final int COMPLETING_PRIORITY = -1; ++ ++ /** ++ * Highest priority, should only be used for main thread tasks or tasks that are blocking the main thread. ++ */ ++ public static final int HIGHEST_PRIORITY = 0; ++ ++ /** ++ * Should be only used in an IO task so that chunk loads do not wait on other IO tasks. ++ * This only exists because IO tasks are scheduled before chunk load tasks to decrease IO waiting times. ++ */ ++ public static final int HIGHER_PRIORITY = 1; ++ ++ /** ++ * Should be used for scheduling chunk loads/generation that would increase response times to users. ++ */ ++ public static final int HIGH_PRIORITY = 2; ++ ++ /** ++ * Default priority. ++ */ ++ public static final int NORMAL_PRIORITY = 3; ++ ++ /** ++ * Use for tasks not at all critical and can potentially be delayed. ++ */ ++ public static final int LOW_PRIORITY = 4; ++ ++ /** ++ * Use for tasks that should "eventually" execute. ++ */ ++ public static final int LOWEST_PRIORITY = 5; ++ ++ private static final int TOTAL_PRIORITIES = 6; ++ ++ final ConcurrentLinkedQueue[] queues = (ConcurrentLinkedQueue[]) new ConcurrentLinkedQueue[TOTAL_PRIORITIES]; ++ ++ private final AtomicBoolean shutdown = new AtomicBoolean(); ++ ++ { ++ for (int i = 0; i < TOTAL_PRIORITIES; ++i) { ++ this.queues[i] = new ConcurrentLinkedQueue<>(); ++ } ++ } ++ ++ /** ++ * Returns whether the specified priority is valid ++ */ ++ public static boolean validPriority(final int priority) { ++ return priority >= 0 && priority < TOTAL_PRIORITIES; ++ } ++ ++ /** ++ * Queues a task. ++ * ++ * @throws IllegalStateException If the task has already been queued. Use {@link PrioritizedTask#raisePriority(int)} to ++ * raise a task's priority. ++ * This can also be thrown if the queue has shutdown. ++ */ ++ public void add(final T task) throws IllegalStateException { ++ task.onQueue(this); ++ this.queues[task.getPriority()].add(task); ++ if (this.shutdown.get()) { ++ // note: we're not actually sure at this point if our task will go through ++ throw new IllegalStateException("Queue has shutdown, refusing to execute task " + IOUtil.genericToString(task)); ++ } ++ } ++ ++ /** ++ * Polls the highest priority task currently available. {@code null} if none. ++ */ ++ public T poll() { ++ T task; ++ for (int i = 0; i < TOTAL_PRIORITIES; ++i) { ++ final ConcurrentLinkedQueue queue = this.queues[i]; ++ ++ while ((task = queue.poll()) != null) { ++ final int prevPriority = task.tryComplete(i); ++ if (prevPriority != COMPLETING_PRIORITY && prevPriority <= i) { ++ // if the prev priority was greater-than or equal to our current priority ++ return task; ++ } ++ } ++ } ++ ++ return null; ++ } ++ ++ /** ++ * Prevent further additions to this queue. Attempts to add after this call has completed (potentially during) will ++ * result in {@link IllegalStateException} being thrown. ++ *

    ++ * This operation is atomic with respect to other shutdown calls ++ *

    ++ *

    ++ * After this call has completed, regardless of return value, this queue will be shutdown. ++ *

    ++ * ++ * @return {@code true} if the queue was shutdown, {@code false} if it has shut down already ++ */ ++ public boolean shutdown() { ++ return this.shutdown.getAndSet(false); ++ } ++ ++ public abstract static class PrioritizedTask { ++ ++ protected final AtomicReference queue = new AtomicReference<>(); ++ ++ protected final AtomicInteger priority; ++ ++ protected PrioritizedTask() { ++ this(PrioritizedTaskQueue.NORMAL_PRIORITY); ++ } ++ ++ protected PrioritizedTask(final int priority) { ++ if (!PrioritizedTaskQueue.validPriority(priority)) { ++ throw new IllegalArgumentException("Invalid priority " + priority); ++ } ++ this.priority = new AtomicInteger(priority); ++ } ++ ++ /** ++ * Returns the current priority. Note that {@link PrioritizedTaskQueue#COMPLETING_PRIORITY} will be returned ++ * if this task is completing or has completed. ++ */ ++ public final int getPriority() { ++ return this.priority.get(); ++ } ++ ++ final int tryComplete(final int minPriority) { ++ for (int curr = this.getPriorityVolatile(); ; ) { ++ if (curr == COMPLETING_PRIORITY) { ++ return COMPLETING_PRIORITY; ++ } ++ if (curr > minPriority) { ++ // curr is lower priority ++ return curr; ++ } ++ ++ if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, COMPLETING_PRIORITY))) { ++ return curr; ++ } ++ continue; ++ } ++ } ++ ++ /** ++ * Forces this task to be completed. ++ * ++ * @return {@code true} if the task was cancelled, {@code false} if the task has already completed or is being completed. ++ */ ++ public boolean cancel() { ++ return this.exchangePriorityVolatile(PrioritizedTaskQueue.COMPLETING_PRIORITY) != PrioritizedTaskQueue.COMPLETING_PRIORITY; ++ } ++ ++ /** ++ * Attempts to raise the priority to the priority level specified. ++ * ++ * @param priority Priority specified ++ * @return {@code true} if successful, {@code false} otherwise. ++ */ ++ public boolean raisePriority(final int priority) { ++ if (!PrioritizedTaskQueue.validPriority(priority)) { ++ throw new IllegalArgumentException("Invalid priority"); ++ } ++ ++ final PrioritizedTaskQueue queue = this.queue.get(); ++ ++ if (queue == null) { ++ throw new IllegalStateException("Not queued"); ++ } ++ ++ for (int curr = this.getPriorityVolatile(); ; ) { ++ if (curr == COMPLETING_PRIORITY) { ++ return false; ++ } ++ if (priority >= curr) { ++ return true; ++ } ++ ++ if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, priority))) { ++ //noinspection unchecked ++ queue.queues[priority].add(this); // silently fail on shutdown ++ return true; ++ } ++ continue; ++ } ++ } ++ ++ /** ++ * Attempts to set this task's priority level to the level specified. ++ * ++ * @param priority Specified priority level. ++ * @return {@code true} if successful, {@code false} if this task is completing or has completed. ++ */ ++ public boolean updatePriority(final int priority) { ++ if (!PrioritizedTaskQueue.validPriority(priority)) { ++ throw new IllegalArgumentException("Invalid priority"); ++ } ++ ++ final PrioritizedTaskQueue queue = this.queue.get(); ++ ++ if (queue == null) { ++ throw new IllegalStateException("Not queued"); ++ } ++ ++ for (int curr = this.getPriorityVolatile(); ; ) { ++ if (curr == COMPLETING_PRIORITY) { ++ return false; ++ } ++ if (curr == priority) { ++ return true; ++ } ++ ++ if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, priority))) { ++ //noinspection unchecked ++ queue.queues[priority].add(this); // silently fail on shutdown ++ return true; ++ } ++ continue; ++ } ++ } ++ ++ void onQueue(final PrioritizedTaskQueue queue) { ++ if (this.queue.getAndSet(queue) != null) { ++ throw new IllegalStateException("Already queued!"); ++ } ++ } ++ ++ /* priority */ ++ ++ protected final int getPriorityVolatile() { ++ return this.priority.get(); ++ } ++ ++ protected final int compareAndExchangePriorityVolatile(final int expect, final int update) { ++ if (this.priority.compareAndSet(expect, update)) { ++ return expect; ++ } ++ return this.priority.get(); ++ } ++ ++ protected final int exchangePriorityVolatile(final int value) { ++ return this.priority.getAndSet(value); ++ } ++ } ++} +\ No newline at end of file +diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java +new file mode 100644 +index 0000000000..1b64cec885 +--- /dev/null ++++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java +@@ -0,0 +1,200 @@ ++package com.destroystokyo.paper.io; ++ ++import net.minecraft.server.MinecraftServer; ++import org.apache.logging.log4j.Logger; ++ ++import java.util.concurrent.ConcurrentLinkedQueue; ++import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.concurrent.locks.LockSupport; ++ ++public class QueueExecutorThread extends Thread { ++ ++ private static final Logger LOGGER = MinecraftServer.LOGGER; ++ ++ protected final PrioritizedTaskQueue queue; ++ protected final long spinWaitTime; ++ ++ protected volatile boolean closed; ++ protected final AtomicBoolean parked = new AtomicBoolean(); ++ ++ protected final ConcurrentLinkedQueue flushQueue = new ConcurrentLinkedQueue<>(); ++ protected volatile int flushCounter; ++ ++ public QueueExecutorThread(final PrioritizedTaskQueue queue) { ++ this(queue, (int) (1.e6)); // 1.0ms ++ } ++ ++ public QueueExecutorThread(final PrioritizedTaskQueue queue, final long spinWaitTime) { // in ms ++ this.queue = queue; ++ this.spinWaitTime = spinWaitTime; ++ } ++ ++ @Override ++ public void run() { ++ final long spinWaitTime = this.spinWaitTime; ++ main_loop: ++ for (; ; ) { ++ this.pollTasks(); ++ ++ // spinwait ++ ++ final long start = System.nanoTime(); ++ ++ for (; ; ) { ++ // If we are interrpted for any reason, park() will always return immediately. Clear so that we don't needlessly use cpu in such an event. ++ Thread.interrupted(); ++ LockSupport.parkNanos("Spinwaiting on tasks", 1000L); // 1us ++ ++ if (this.pollTasks()) { ++ continue main_loop; ++ } ++ ++ if (this.handleClose()) { ++ return; // we're done ++ } ++ ++ if ((System.nanoTime() - start) >= spinWaitTime) { ++ break; ++ } ++ } ++ ++ if (this.handleClose()) { ++ return; ++ } ++ ++ this.parked.set(true); ++ // We need to parse here to avoid a race condition where a thread queues a task before we set parked to true ++ if (this.pollTasks()) { ++ this.parked.set(false); ++ continue; ++ } ++ ++ do { ++ Thread.interrupted(); ++ LockSupport.park("Waiting on tasks"); ++ } while (this.parked.get()); ++ } ++ } ++ ++ protected boolean handleClose() { ++ if (this.closed) { ++ // at this stage no task may be queued ++ this.pollTasks(); // this ensures we've emptied the queue ++ this.handleFlushThreads(true); ++ return true; ++ } ++ return false; ++ } ++ ++ protected boolean pollTasks() { ++ Runnable task; ++ boolean ret = false; ++ ++ while ((task = (Runnable) this.queue.poll()) != null) { ++ ret = true; ++ try { ++ task.run(); ++ } catch (final Throwable throwable) { ++ if (throwable instanceof ThreadDeath) { ++ throw (ThreadDeath) throwable; ++ } ++ LOGGER.fatal("Exception thrown from prioritized runnable task in thread " + this.getName() + ": " + IOUtil.genericToString(task), throwable); ++ } ++ } ++ ++ this.handleFlushThreads(false); ++ ++ return ret; ++ } ++ ++ protected void handleFlushThreads(final boolean shutdown) { ++ //noinspection NonAtomicOperationOnVolatileField ++ ++this.flushCounter; ++ ++ Thread current; ++ ++ while ((current = this.flushQueue.poll()) != null) { ++ LockSupport.unpark(current); ++ } ++ ++ } ++ ++ protected void queueTask(final T task) { ++ this.queue.add(task); ++ if (this.parked.get() && this.parked.getAndSet(false)) { ++ LockSupport.unpark(this); ++ } ++ } ++ ++ /** ++ * Waits until this thread's queue is empty. ++ * ++ * @throws IllegalStateException If the current thread is {@code this} thread. ++ */ ++ public void flush() { ++ final Thread currentThread = Thread.currentThread(); ++ ++ if (currentThread == this) { ++ throw new IllegalStateException("Cannot flush the queue executor thread while on the queue executor thread"); ++ } ++ ++ // order is important ++ ++ if (this.closed) { ++ return; ++ } ++ ++ if (this.parked.get()) { ++ return; // no tasks queued ++ } ++ ++ int flushCounter = this.flushCounter; ++ ++ this.flushQueue.add(currentThread); ++ ++ if (this.closed) { ++ return; ++ } ++ ++ // re-check parked status ++ if (this.parked.get()) { ++ return; ++ } ++ ++ // force a response from the IO thread, we're not sure of its state currently ++ this.parked.set(false); ++ LockSupport.unpark(this); ++ ++ boolean interrupted = false; // preserve interrupted status ++ ++ while (this.flushCounter == flushCounter) { ++ interrupted |= Thread.interrupted(); ++ LockSupport.park(); ++ } ++ ++ if (interrupted) { ++ Thread.currentThread().interrupt(); ++ } ++ } ++ ++ /** ++ * Closes this queue executor's queue and optionally waits for it to empty. ++ *

    ++ * If wait is {@code true}, then the queue will be empty by the time this call completes. ++ *

    ++ *

    ++ * This function is MT-Safe. ++ *

    ++ * ++ * @param wait If this call is to wait until the queue is empty ++ * @return whether this thread shut down the queue ++ */ ++ public boolean close(final boolean wait) { ++ boolean ret = this.queue.shutdown(); ++ this.closed = true; ++ if (wait) { ++ this.flush(); ++ } ++ return ret; ++ } ++} +\ No newline at end of file +diff --git a/src/main/java/net/minecraft/server/ChunkProviderServer.java b/src/main/java/net/minecraft/server/ChunkProviderServer.java +index b46285ecdc..f6a6421140 100644 +--- a/src/main/java/net/minecraft/server/ChunkProviderServer.java ++++ b/src/main/java/net/minecraft/server/ChunkProviderServer.java +@@ -286,6 +286,7 @@ public class ChunkProviderServer extends IChunkProvider { + @Nullable + @Override + public IChunkAccess getChunkAt(int i, int j, ChunkStatus chunkstatus, boolean flag) { ++ final int x = i; final int z = j; // Paper - conflict on variable change + if (Thread.currentThread() != this.serverThread) { + return (IChunkAccess) CompletableFuture.supplyAsync(() -> { + return this.getChunkAt(i, j, chunkstatus, flag); +@@ -307,6 +308,10 @@ public class ChunkProviderServer extends IChunkProvider { + CompletableFuture> completablefuture = this.getChunkFutureMainThread(i, j, chunkstatus, flag); + + if (!completablefuture.isDone()) { // Paper ++ // Paper start - async chunk io ++ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.bumpPriority(this.world, x, z, ++ com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGHEST_PRIORITY); ++ // Paper end + this.world.timings.chunkAwait.startTiming(); // Paper + this.serverThreadQueue.awaitTasks(completablefuture::isDone); + this.world.timings.chunkAwait.stopTiming(); // Paper +diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java +index 0324a90ca5..430cd70cf5 100644 +--- a/src/main/java/net/minecraft/server/MinecraftServer.java ++++ b/src/main/java/net/minecraft/server/MinecraftServer.java +@@ -764,6 +764,7 @@ public abstract class MinecraftServer extends IAsyncTaskHandlerReentrant executor; + public final ChunkGenerator chunkGenerator; +- private final Supplier m; ++ private final Supplier m; private final Supplier getWorldPersistentDataSupplier() { return this.m; } // Paper - OBFHELPER + private final VillagePlace n; + public final LongSet unloadQueue; + private boolean updatingChunksModified; +@@ -101,7 +101,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + this.lightEngine = new LightEngineThreaded(ilightaccess, this, this.world.getWorldProvider().g(), threadedmailbox2, this.q.a(threadedmailbox2, false)); + this.u = new PlayerChunkMap.a(executor, iasynctaskhandler); + this.m = supplier; +- this.n = new VillagePlace(new File(this.x, "poi"), datafixer); ++ this.n = new VillagePlace(new File(this.x, "poi"), datafixer, this.world); // Paper + this.setViewDistance(i); + } + +@@ -261,6 +261,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + @Override + public void close() throws IOException { + this.q.close(); ++ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.flush(); // Paper - Required since we're closing regionfiles in the next line + this.n.close(); + super.close(); + } +@@ -308,7 +309,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + shouldSave = ((Chunk) ichunkaccess).lastSaved + world.paperConfig.autoSavePeriod <= world.getTime(); + } + +- if (shouldSave && this.saveChunk(ichunkaccess)) { ++ if (shouldSave && this.saveChunk(ichunkaccess, true)) { // Paper - async chunk io + ++savedThisTick; + playerchunk.m(); + } +@@ -386,7 +387,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + ((Chunk) ichunkaccess).setLoaded(false); + } + +- this.saveChunk(ichunkaccess); ++ this.saveChunk(ichunkaccess, true); // Paper - async chunk io + if (this.loadedChunks.remove(i) && ichunkaccess instanceof Chunk) { + Chunk chunk = (Chunk) ichunkaccess; + +@@ -462,13 +463,22 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + } + } + ++ // Paper start - Async chunk io ++ public NBTTagCompound completeChunkData(NBTTagCompound compound, ChunkCoordIntPair chunkcoordintpair) throws IOException { ++ return compound == null ? null : this.getChunkData(this.world.getWorldProvider().getDimensionManager(), this.getWorldPersistentDataSupplier(), compound, chunkcoordintpair, this.world); ++ } ++ // Paper end ++ + private CompletableFuture> f(ChunkCoordIntPair chunkcoordintpair) { +- return CompletableFuture.supplyAsync(() -> { ++ // Paper start - Async chunk io ++ final java.util.function.BiFunction> callable = (chunkData, ioThrowable) -> { + try (Timing ignored = this.world.timings.syncChunkLoadTimer.startTimingIfSync()) { // Paper +- NBTTagCompound nbttagcompound; // Paper +- try (Timing ignored2 = this.world.timings.chunkIOStage1.startTimingIfSync()) { // Paper +- nbttagcompound = this.readChunkData(chunkcoordintpair); ++ if (ioThrowable != null) { ++ com.destroystokyo.paper.io.IOUtil.rethrow(ioThrowable); + } ++ this.n.loadInData(chunkcoordintpair, chunkData.poiData); ++ NBTTagCompound nbttagcompound = this.completeChunkData(chunkData.chunkData, chunkcoordintpair); ++ // Paper end + + if (nbttagcompound != null) { + boolean flag = nbttagcompound.hasKeyOfType("Level", 10) && nbttagcompound.getCompound("Level").hasKeyOfType("Status", 8); +@@ -495,7 +505,24 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + } + + return Either.left(new ProtoChunk(chunkcoordintpair, ChunkConverter.a, this.world)); // Paper - Anti-Xray +- }, this.executor); ++ // Paper start - Async chunk io ++ }; ++ CompletableFuture> ret = new CompletableFuture<>(); ++ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.loadChunkDataAsync(this.world, chunkcoordintpair.x, chunkcoordintpair.z, ++ com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY, (chunkData) -> { ++ PlayerChunkMap.this.executor.execute(() -> { ++ Throwable throwable = null; ++ ++ if (chunkData.chunkData == com.destroystokyo.paper.io.ConcreteFileIOThread.FAILURE_VALUE || ++ chunkData.poiData == com.destroystokyo.paper.io.ConcreteFileIOThread.FAILURE_VALUE) { ++ throwable = new Throwable("See log from the file io thread above"); ++ } ++ ++ ret.complete(callable.apply(throwable != null ? null : chunkData, throwable)); ++ }); ++ }, true, true, false); ++ return ret; ++ // Paper end + } + + private CompletableFuture> b(PlayerChunk playerchunk, ChunkStatus chunkstatus) { +@@ -701,18 +728,40 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + return this.v.get(); + } + ++ // Paper start - async chunk io ++ private boolean writeDataAsync(ChunkCoordIntPair chunkPos, NBTTagCompound poiData, NBTTagCompound chunkData, boolean async) { ++ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave(this.world, chunkPos.x, chunkPos.z, ++ poiData, chunkData, !async ? com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGHEST_PRIORITY : com.destroystokyo.paper.io.PrioritizedTaskQueue.LOW_PRIORITY); ++ ++ if (async) { ++ return true; ++ } ++ Boolean successPoi = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, chunkPos.x, chunkPos.z, true, true); ++ Boolean successChunk = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, chunkPos.x, chunkPos.z, true, false); ++ ++ if (successPoi == Boolean.FALSE || successChunk == Boolean.FALSE) { ++ return false; ++ } ++ ++ // null indicates no task existed, which means our write completed before we waited on it ++ ++ return true; ++ } ++ // Paper end ++ + public boolean saveChunk(IChunkAccess ichunkaccess) { +- this.n.a(ichunkaccess.getPos()); ++ // Paper start - async param ++ return this.saveChunk(ichunkaccess, false); ++ } ++ ++ public boolean saveChunk(IChunkAccess ichunkaccess, boolean async) { ++ NBTTagCompound poiData = this.getVillagePlace().getData(ichunkaccess.getPos()); // Paper ++ //this.n.a(ichunkaccess.getPos()); // Delay ++ // Paper end + if (!ichunkaccess.isNeedsSaving()) { + return false; + } else { +- try { +- this.world.checkSession(); +- } catch (ExceptionWorldConflict exceptionworldconflict) { +- PlayerChunkMap.LOGGER.error("Couldn't save chunk; already in use by another instance of Minecraft?", exceptionworldconflict); +- com.destroystokyo.paper.exception.ServerInternalException.reportInternalException(exceptionworldconflict); // Paper +- return false; +- } ++ // Paper - The save session check is performed on the IO thread + + ichunkaccess.setLastSaved(this.world.getTime()); + ichunkaccess.setNeedsSaving(false); +@@ -724,20 +773,22 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + + if (chunkstatus.getType() != ChunkStatus.Type.LEVELCHUNK) { + // Paper start - Optimize save by using status cache +- ChunkStatus statusOnDisk = this.getRegionFile(ichunkaccess.getPos(), false).getStatus(ichunkaccess.getPos().x, ichunkaccess.getPos().z, this); ++ ChunkStatus statusOnDisk = this.getChunkStatus(chunkcoordintpair, true); // Paper - Async chunk io + if (statusOnDisk != null && statusOnDisk.getType() == ChunkStatus.Type.LEVELCHUNK) { + // Paper end ++ this.writeDataAsync(ichunkaccess.getPos(), poiData, null, async); // Paper - Async chunk io + return false; + } + + if (chunkstatus == ChunkStatus.EMPTY && ichunkaccess.h().values().stream().noneMatch(StructureStart::e)) { ++ this.writeDataAsync(ichunkaccess.getPos(), poiData, null, async); // Paper - Async chunk io + return false; + } + } + + nbttagcompound = ChunkRegionLoader.saveChunk(this.world, ichunkaccess); +- this.write(chunkcoordintpair, nbttagcompound); +- return true; ++ return this.writeDataAsync(ichunkaccess.getPos(), poiData, nbttagcompound, async); // Paper - Async chunk io ++ //return true; // Paper + } catch (Exception exception) { + PlayerChunkMap.LOGGER.error("Failed to save chunk {},{}", chunkcoordintpair.x, chunkcoordintpair.z, exception); + com.destroystokyo.paper.exception.ServerInternalException.reportInternalException(exception); // Paper +@@ -808,6 +859,62 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + return Iterables.unmodifiableIterable(this.visibleChunks.values()); + } + ++ // Paper start - Asynchronous chunk io ++ @Nullable ++ @Override ++ public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws IOException { ++ if (Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) { ++ NBTTagCompound ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE ++ .loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(), ++ false, true, true).join().chunkData; ++ ++ if (ret == com.destroystokyo.paper.io.ConcreteFileIOThread.FAILURE_VALUE) { ++ throw new IOException("See logs for further detail"); ++ } ++ return ret; ++ } ++ return super.read(chunkcoordintpair); ++ } ++ ++ @Override ++ public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws IOException { ++ if (Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) { ++ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave( ++ this.world, chunkcoordintpair.x, chunkcoordintpair.z, null, nbttagcompound, ++ com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread()); ++ ++ Boolean ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, ++ chunkcoordintpair.x, chunkcoordintpair.z, true, false); ++ ++ if (ret == Boolean.FALSE) { ++ throw new IOException("See logs for further detail"); ++ } ++ return; ++ } ++ super.write(chunkcoordintpair, nbttagcompound); ++ } ++ ++ public ChunkStatus getChunkStatus(ChunkCoordIntPair chunkPos, boolean load) throws IOException { ++ synchronized (this) { ++ // we enter a synchronized block here so that we do not potentially use a closed regionfile ++ RegionFile regionFile = this.getRegionFile(chunkPos, false); ++ ChunkStatus status = regionFile.getStatusIfCached(chunkPos.x, chunkPos.z); ++ ++ if (!load || status != null || !regionFile.chunkExists(chunkPos)) { ++ return status; ++ } ++ ++ // at this stage we need to load chunk data, however it's best we do that outside of the synchronized block ++ } ++ ++ NBTTagCompound compound = this.readChunkData(chunkPos); ++ ++ // In order to avoid a race condition where a regionfile is re-loaded concurrently we directly use the status in ++ // the returned compound. readChunkData will update the regionfile ++ return compound == null ? null : ChunkRegionLoader.getStatus(compound); ++ } ++ // Paper end ++ + @Nullable + public NBTTagCompound readChunkData(ChunkCoordIntPair chunkcoordintpair) throws IOException { // Paper - private -> public + NBTTagCompound nbttagcompound = this.read(chunkcoordintpair); +@@ -822,7 +929,9 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + return null; + } + +- this.getRegionFile(chunkcoordintpair, false).setStatus(chunkcoordintpair.x, chunkcoordintpair.z, ChunkRegionLoader.getStatus(nbttagcompound)); ++ synchronized (this) { // Async chunk io - Synchronize so we do not potentially get and use a closed region file ++ this.getRegionFile(chunkcoordintpair, false).setStatus(chunkcoordintpair.x, chunkcoordintpair.z, ChunkRegionLoader.getStatus(nbttagcompound)); ++ } + + return nbttagcompound; + // Paper end +@@ -1165,6 +1274,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { + + } + ++ public VillagePlace getVillagePlace() { return this.h(); } // Paper - OBFHELPER + protected VillagePlace h() { + return this.n; + } +diff --git a/src/main/java/net/minecraft/server/RegionFile.java b/src/main/java/net/minecraft/server/RegionFile.java +index 18f218e971..2f10152404 100644 +--- a/src/main/java/net/minecraft/server/RegionFile.java ++++ b/src/main/java/net/minecraft/server/RegionFile.java +@@ -55,20 +55,7 @@ public class RegionFile implements AutoCloseable { + } + + public ChunkStatus getStatus(int x, int z, PlayerChunkMap playerChunkMap) throws IOException { +- if (this.closed) { +- // We've used an invalid region file. +- throw new java.io.EOFException("RegionFile is closed"); +- } +- ChunkCoordIntPair chunkPos = new ChunkCoordIntPair(x, z); +- int location = this.getChunkLocation(chunkPos); +- ChunkStatus cached = this.statuses[location]; +- if (cached != null) { +- return cached; +- } +- +- playerChunkMap.readChunkData(chunkPos); // This will set our status (yes it's disgusting) +- +- return this.statuses[location]; ++ return playerChunkMap.getChunkStatus(new ChunkCoordIntPair(x, z), true); + } + // Paper end + +@@ -354,7 +341,7 @@ public class RegionFile implements AutoCloseable { + this.writeInt(i); // Paper - Avoid 3 io write calls + } + +- public void close() throws IOException { ++ public synchronized void close() throws IOException { // Paper - synchronize + this.closed = true; // Paper + this.b.close(); + } +diff --git a/src/main/java/net/minecraft/server/RegionFileCache.java b/src/main/java/net/minecraft/server/RegionFileCache.java +index e0fdf5f90f..d9283b36b6 100644 +--- a/src/main/java/net/minecraft/server/RegionFileCache.java ++++ b/src/main/java/net/minecraft/server/RegionFileCache.java +@@ -51,13 +51,13 @@ public abstract class RegionFileCache implements AutoCloseable { + } + + // Paper start +- public RegionFile getRegionFileIfLoaded(ChunkCoordIntPair chunkcoordintpair) { ++ public synchronized RegionFile getRegionFileIfLoaded(ChunkCoordIntPair chunkcoordintpair) { // Paper - synchronize for async io + return this.cache.getAndMoveToFirst(ChunkCoordIntPair.pair(chunkcoordintpair.getRegionX(), chunkcoordintpair.getRegionZ())); + } + // Paper end + + public RegionFile getRegionFile(ChunkCoordIntPair chunkcoordintpair, boolean existingOnly) throws IOException { return this.a(chunkcoordintpair, existingOnly); } // Paper - OBFHELPER +- private RegionFile a(ChunkCoordIntPair chunkcoordintpair, boolean existingOnly) throws IOException { // CraftBukkit ++ private synchronized RegionFile a(ChunkCoordIntPair chunkcoordintpair, boolean existingOnly) throws IOException { // CraftBukkit // Paper - synchronize for async io + long i = ChunkCoordIntPair.pair(chunkcoordintpair.getRegionX(), chunkcoordintpair.getRegionZ()); + RegionFile regionfile = (RegionFile) this.cache.getAndMoveToFirst(i); + +@@ -344,7 +344,7 @@ public abstract class RegionFileCache implements AutoCloseable { + } + + // CraftBukkit start +- public boolean chunkExists(ChunkCoordIntPair pos) throws IOException { ++ public synchronized boolean chunkExists(ChunkCoordIntPair pos) throws IOException { // Paper - synchronize + copyIfNeeded(pos.x, pos.z); // Paper + RegionFile regionfile = a(pos, true); + +diff --git a/src/main/java/net/minecraft/server/RegionFileSection.java b/src/main/java/net/minecraft/server/RegionFileSection.java +index a343a7b31d..570b01a6cb 100644 +--- a/src/main/java/net/minecraft/server/RegionFileSection.java ++++ b/src/main/java/net/minecraft/server/RegionFileSection.java +@@ -24,7 +24,7 @@ public class RegionFileSection extends RegionFi + + private static final Logger LOGGER = LogManager.getLogger(); + private final Long2ObjectMap> b = new Long2ObjectOpenHashMap(); +- private final LongLinkedOpenHashSet d = new LongLinkedOpenHashSet(); ++ protected final LongLinkedOpenHashSet d = new LongLinkedOpenHashSet(); // Paper + private final BiFunction, R> e; + private final Function f; + private final DataFixer g; +@@ -39,8 +39,8 @@ public class RegionFileSection extends RegionFi + } + + protected void a(BooleanSupplier booleansupplier) { +- while (!this.d.isEmpty() && booleansupplier.getAsBoolean()) { +- ChunkCoordIntPair chunkcoordintpair = SectionPosition.a(this.d.firstLong()).u(); ++ while (!this.d.isEmpty() && booleansupplier.getAsBoolean()) { // Paper - conflict here to avoid obfhelpers ++ ChunkCoordIntPair chunkcoordintpair = SectionPosition.a(this.d.firstLong()).u(); // Paper - conflict here to avoid obfhelpers + + this.d(chunkcoordintpair); + } +@@ -82,9 +82,9 @@ public class RegionFileSection extends RegionFi + Optional optional = this.d(i); + + if (optional.isPresent()) { +- return (MinecraftSerializable) optional.get(); ++ return optional.get(); // Paper - decompile fix + } else { +- R r0 = (MinecraftSerializable) this.f.apply(() -> { ++ R r0 = this.f.apply(() -> { // Paper - decompile fix + this.a(i); + }); + +@@ -94,7 +94,13 @@ public class RegionFileSection extends RegionFi + } + + private void b(ChunkCoordIntPair chunkcoordintpair) { +- this.a(chunkcoordintpair, DynamicOpsNBT.a, this.c(chunkcoordintpair)); ++ // Paper start - load data in function ++ this.loadInData(chunkcoordintpair, this.c(chunkcoordintpair)); ++ } ++ ++ public void loadInData(ChunkCoordIntPair chunkPos, NBTTagCompound compound) { ++ this.a(chunkPos, DynamicOpsNBT.a, compound); ++ // Paper end + } + + @Nullable +@@ -123,7 +129,7 @@ public class RegionFileSection extends RegionFi + for (int l = 0; l < 16; ++l) { + long i1 = SectionPosition.a(chunkcoordintpair, l).v(); + Optional optional = optionaldynamic.get(Integer.toString(l)).get().map((dynamic2) -> { +- return (MinecraftSerializable) this.e.apply(() -> { ++ return this.e.apply(() -> { // Paper - decompile fix + this.a(i1); + }, dynamic2); + }); +@@ -142,7 +148,7 @@ public class RegionFileSection extends RegionFi + } + + private void d(ChunkCoordIntPair chunkcoordintpair) { +- Dynamic dynamic = this.a(chunkcoordintpair, DynamicOpsNBT.a); ++ Dynamic dynamic = this.a(chunkcoordintpair, DynamicOpsNBT.a); // Paper - conflict here to avoid adding obfhelpers :) + NBTBase nbtbase = (NBTBase) dynamic.getValue(); + + if (nbtbase instanceof NBTTagCompound) { +@@ -157,6 +163,20 @@ public class RegionFileSection extends RegionFi + + } + ++ // Paper start - internal get data function, copied from above ++ private NBTTagCompound getDataInternal(ChunkCoordIntPair chunkcoordintpair) { ++ Dynamic dynamic = this.a(chunkcoordintpair, DynamicOpsNBT.a); ++ NBTBase nbtbase = (NBTBase) dynamic.getValue(); ++ ++ if (nbtbase instanceof NBTTagCompound) { ++ return (NBTTagCompound)nbtbase; ++ } else { ++ RegionFileSection.LOGGER.error("Expected compound tag, got {}", nbtbase); ++ } ++ return null; ++ } ++ // Paper end ++ + private Dynamic a(ChunkCoordIntPair chunkcoordintpair, DynamicOps dynamicops) { + Map map = Maps.newHashMap(); + +@@ -193,9 +213,9 @@ public class RegionFileSection extends RegionFi + public void a(ChunkCoordIntPair chunkcoordintpair) { + if (!this.d.isEmpty()) { + for (int i = 0; i < 16; ++i) { +- long j = SectionPosition.a(chunkcoordintpair, i).v(); ++ long j = SectionPosition.a(chunkcoordintpair, i).v(); // Paper - conflict here to avoid obfhelpers + +- if (this.d.contains(j)) { ++ if (this.d.contains(j)) { // Paper - conflict here to avoid obfhelpers + this.d(chunkcoordintpair); + return; + } +@@ -203,4 +223,21 @@ public class RegionFileSection extends RegionFi + } + + } ++ ++ // Paper start - get data function ++ public NBTTagCompound getData(ChunkCoordIntPair chunkcoordintpair) { ++ // Note: Copied from above ++ // This is checking if the data exists, then it builds it later in getDataInternal(ChunkCoordIntPair) ++ if (!this.d.isEmpty()) { ++ for (int i = 0; i < 16; ++i) { ++ long j = SectionPosition.a(chunkcoordintpair, i).v(); ++ ++ if (this.d.contains(j)) { ++ return this.getDataInternal(chunkcoordintpair); ++ } ++ } ++ } ++ return null; ++ } ++ // Paper end + } +diff --git a/src/main/java/net/minecraft/server/VillagePlace.java b/src/main/java/net/minecraft/server/VillagePlace.java +index 7bc473e1ef..9f4b1b4c49 100644 +--- a/src/main/java/net/minecraft/server/VillagePlace.java ++++ b/src/main/java/net/minecraft/server/VillagePlace.java +@@ -20,8 +20,17 @@ public class VillagePlace extends RegionFileSection { + + private final VillagePlace.a a = new VillagePlace.a(); + ++ private final WorldServer world; // Paper ++ + public VillagePlace(File file, DataFixer datafixer) { ++ // Paper start ++ this(file, datafixer, null); ++ } ++ ++ public VillagePlace(File file, DataFixer datafixer, WorldServer world) { ++ // Paper end + super(file, VillagePlaceSection::new, VillagePlaceSection::new, datafixer, DataFixTypes.POI_CHUNK); ++ this.world = world; // Paper + } + + public void a(BlockPosition blockposition, VillagePlaceType villageplacetype) { +@@ -128,7 +137,20 @@ public class VillagePlace extends RegionFileSection { + + @Override + public void a(BooleanSupplier booleansupplier) { +- super.a(booleansupplier); ++ // Paper start - async chunk io ++ if (this.world == null) { ++ super.a(booleansupplier); ++ } else { ++ //super.a(booleansupplier); // re-implement below ++ while (!((RegionFileSection)this).d.isEmpty() && booleansupplier.getAsBoolean()) { ++ ChunkCoordIntPair chunkcoordintpair = SectionPosition.a(((RegionFileSection)this).d.firstLong()).u(); ++ ++ NBTTagCompound data = this.getData(chunkcoordintpair); ++ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave(this.world, ++ chunkcoordintpair.x, chunkcoordintpair.z, data, null, com.destroystokyo.paper.io.PrioritizedTaskQueue.LOW_PRIORITY); ++ } ++ } ++ // Paper end + this.a.a(); + } + +@@ -164,7 +186,7 @@ public class VillagePlace extends RegionFileSection { + } + + private static boolean a(ChunkSection chunksection) { +- Stream stream = VillagePlaceType.e(); ++ Stream stream = VillagePlaceType.e(); // Paper - decompile fix + + chunksection.getClass(); + return stream.anyMatch(chunksection::a); +@@ -214,6 +236,42 @@ public class VillagePlace extends RegionFileSection { + } + } + ++ // Paper start - Asynchronous chunk io ++ @javax.annotation.Nullable ++ @Override ++ public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws java.io.IOException { ++ if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) { ++ NBTTagCompound ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE ++ .loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(), ++ true, false, true).join().poiData; ++ ++ if (ret == com.destroystokyo.paper.io.ConcreteFileIOThread.FAILURE_VALUE) { ++ throw new java.io.IOException("See logs for further detail"); ++ } ++ return ret; ++ } ++ return super.read(chunkcoordintpair); ++ } ++ ++ @Override ++ public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws java.io.IOException { ++ if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) { ++ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave( ++ this.world, chunkcoordintpair.x, chunkcoordintpair.z, nbttagcompound, null, ++ com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread()); ++ ++ Boolean ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, ++ chunkcoordintpair.x, chunkcoordintpair.z, true, true); ++ ++ if (ret == Boolean.FALSE) { ++ throw new java.io.IOException("See logs for further detail"); ++ } ++ return; ++ } ++ super.write(chunkcoordintpair, nbttagcompound); ++ } ++ // Paper end ++ + public static enum Occupancy { + + HAS_SPACE(VillagePlaceRecord::d), IS_OCCUPIED(VillagePlaceRecord::e), ANY((villageplacerecord) -> { +@@ -222,7 +280,7 @@ public class VillagePlace extends RegionFileSection { + + private final Predicate d; + +- private Occupancy(Predicate predicate) { ++ private Occupancy(Predicate predicate) { // Paper - decompile fix + this.d = predicate; + } + +diff --git a/src/main/java/net/minecraft/server/WorldServer.java b/src/main/java/net/minecraft/server/WorldServer.java +index b5c902e1de..7f7b2a539a 100644 +--- a/src/main/java/net/minecraft/server/WorldServer.java ++++ b/src/main/java/net/minecraft/server/WorldServer.java +@@ -75,6 +75,78 @@ public class WorldServer extends World { + return new Throwable(entity + " Added to world at " + new java.util.Date()); + } + ++ // Paper start - Asynchronous IO ++ public final com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController poiDataController = new com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController() { ++ @Override ++ public void writeData(int x, int z, NBTTagCompound compound) throws java.io.IOException { ++ WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().write(new ChunkCoordIntPair(x, z), compound); ++ } ++ ++ @Override ++ public NBTTagCompound readData(int x, int z) throws java.io.IOException { ++ return WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().read(new ChunkCoordIntPair(x, z)); ++ } ++ ++ @Override ++ public T computeForRegionFile(int chunkX, int chunkZ, java.util.function.Function function) { ++ synchronized (WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace()) { ++ RegionFile file; ++ ++ try { ++ file = WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().getRegionFile(new ChunkCoordIntPair(chunkX, chunkZ), false); ++ } catch (java.io.IOException ex) { ++ throw new RuntimeException(ex); ++ } ++ ++ return function.apply(file); ++ } ++ } ++ ++ @Override ++ public T computeForRegionFileIfLoaded(int chunkX, int chunkZ, java.util.function.Function function) { ++ synchronized (WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace()) { ++ RegionFile file = WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().getRegionFileIfLoaded(new ChunkCoordIntPair(chunkX, chunkZ)); ++ return function.apply(file); ++ } ++ } ++ }; ++ ++ public final com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController chunkDataController = new com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController() { ++ @Override ++ public void writeData(int x, int z, NBTTagCompound compound) throws java.io.IOException { ++ WorldServer.this.getChunkProvider().playerChunkMap.write(new ChunkCoordIntPair(x, z), compound); ++ } ++ ++ @Override ++ public NBTTagCompound readData(int x, int z) throws java.io.IOException { ++ return WorldServer.this.getChunkProvider().playerChunkMap.read(new ChunkCoordIntPair(x, z)); ++ } ++ ++ @Override ++ public T computeForRegionFile(int chunkX, int chunkZ, java.util.function.Function function) { ++ synchronized (WorldServer.this.getChunkProvider().playerChunkMap) { ++ RegionFile file; ++ ++ try { ++ file = WorldServer.this.getChunkProvider().playerChunkMap.getRegionFile(new ChunkCoordIntPair(chunkX, chunkZ), false); ++ } catch (java.io.IOException ex) { ++ throw new RuntimeException(ex); ++ } ++ ++ return function.apply(file); ++ } ++ } ++ ++ @Override ++ public T computeForRegionFileIfLoaded(int chunkX, int chunkZ, java.util.function.Function function) { ++ synchronized (WorldServer.this.getChunkProvider().playerChunkMap) { ++ RegionFile file = WorldServer.this.getChunkProvider().playerChunkMap.getRegionFileIfLoaded(new ChunkCoordIntPair(chunkX, chunkZ)); ++ return function.apply(file); ++ } ++ } ++ }; ++ // Paper end ++ + // Add env and gen to constructor + public WorldServer(MinecraftServer minecraftserver, Executor executor, WorldNBTStorage worldnbtstorage, WorldData worlddata, DimensionManager dimensionmanager, GameProfilerFiller gameprofilerfiller, WorldLoadListener worldloadlistener, org.bukkit.World.Environment env, org.bukkit.generator.ChunkGenerator gen) { + super(worlddata, dimensionmanager, (world, worldprovider) -> { +-- +2.20.1 +