mirror of
https://github.com/PurpurMC/Purpur.git
synced 2026-02-17 16:37:43 +01:00
Upstream has released updates that appears to apply and compile correctly Paper Changes: a41d51f0 ServerExceptionEvent can be fired off main 920ad7c7 Use getChunkIfLoadedImmediately in places (#2297) 0708fa36 Updated Upstream (CraftBukkit/Spigot)
2775 lines
122 KiB
Diff
2775 lines
122 KiB
Diff
From c4a6f06f03a00f7c612b574ea996db5218711cd3 Mon Sep 17 00:00:00 2001
|
|
From: Spottedleaf <Spottedleaf@users.noreply.github.com>
|
|
Date: Sat, 13 Jul 2019 09:23:10 -0700
|
|
Subject: [PATCH] Asynchronous chunk IO and loading
|
|
|
|
---
|
|
.../co/aikar/timings/WorldTimingsHandler.java | 14 +
|
|
.../ChunkPacketBlockControllerAntiXray.java | 43 +-
|
|
.../paper/io/ConcreteFileIOThread.java | 661 ++++++++++++++++++
|
|
.../com/destroystokyo/paper/io/IOUtil.java | 62 ++
|
|
.../paper/io/PrioritizedTaskQueue.java | 258 +++++++
|
|
.../paper/io/QueueExecutorThread.java | 216 ++++++
|
|
.../paper/io/chunk/ChunkLoadTask.java | 104 +++
|
|
.../paper/io/chunk/ChunkLoadTaskManager.java | 119 ++++
|
|
.../minecraft/server/ChunkProviderServer.java | 135 ++++
|
|
.../minecraft/server/ChunkRegionLoader.java | 51 +-
|
|
.../net/minecraft/server/ChunkStatus.java | 1 +
|
|
.../net/minecraft/server/IChunkLoader.java | 29 +-
|
|
.../java/net/minecraft/server/MCUtil.java | 5 +
|
|
.../net/minecraft/server/MinecraftServer.java | 1 +
|
|
.../net/minecraft/server/PlayerChunkMap.java | 204 +++++-
|
|
.../java/net/minecraft/server/RegionFile.java | 2 +-
|
|
.../net/minecraft/server/RegionFileCache.java | 6 +-
|
|
.../minecraft/server/RegionFileSection.java | 56 +-
|
|
.../java/net/minecraft/server/TicketType.java | 1 +
|
|
.../net/minecraft/server/VillagePlace.java | 66 +-
|
|
.../net/minecraft/server/WorldServer.java | 74 ++
|
|
.../org/bukkit/craftbukkit/CraftWorld.java | 36 +-
|
|
22 files changed, 2066 insertions(+), 78 deletions(-)
|
|
create mode 100644 src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java
|
|
create mode 100644 src/main/java/com/destroystokyo/paper/io/IOUtil.java
|
|
create mode 100644 src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
|
|
create mode 100644 src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
|
|
create mode 100644 src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java
|
|
create mode 100644 src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTaskManager.java
|
|
|
|
diff --git a/src/main/java/co/aikar/timings/WorldTimingsHandler.java b/src/main/java/co/aikar/timings/WorldTimingsHandler.java
|
|
index 366de66657..7f623270f9 100644
|
|
--- a/src/main/java/co/aikar/timings/WorldTimingsHandler.java
|
|
+++ b/src/main/java/co/aikar/timings/WorldTimingsHandler.java
|
|
@@ -51,6 +51,13 @@ public class WorldTimingsHandler {
|
|
public final Timing worldSaveChunks;
|
|
public final Timing worldSaveLevel;
|
|
public final Timing chunkSaveData;
|
|
+ public final Timing poiUnload;
|
|
+ public final Timing chunkUnload;
|
|
+ public final Timing poiSaveDataSerialization;
|
|
+ public final Timing chunkSave;
|
|
+ public final Timing chunkSaveOverwriteCheck;
|
|
+ public final Timing chunkSaveDataSerialization;
|
|
+ public final Timing chunkSaveIOWait;
|
|
|
|
public WorldTimingsHandler(World server) {
|
|
String name = server.worldData.getName() +" - ";
|
|
@@ -99,6 +106,13 @@ public class WorldTimingsHandler {
|
|
tracker2 = Timings.ofSafe(name + "tracker stage 2");
|
|
doTick = Timings.ofSafe(name + "doTick");
|
|
tickEntities = Timings.ofSafe(name + "tickEntities");
|
|
+ poiUnload = Timings.ofSafe(name + "Chunk unload - POI");
|
|
+ chunkUnload = Timings.ofSafe(name + "Chunk unload - Chunk");
|
|
+ poiSaveDataSerialization = Timings.ofSafe(name + "Chunk save - POI Data serialization");
|
|
+ chunkSave = Timings.ofSafe(name + "Chunk save - Chunk");
|
|
+ chunkSaveOverwriteCheck = Timings.ofSafe(name + "Chunk save - Chunk Overwrite check");
|
|
+ chunkSaveDataSerialization = Timings.ofSafe(name + "Chunk save - Chunk Data serialization");
|
|
+ chunkSaveIOWait = Timings.ofSafe(name + "Chunk save - Chunk IO wait");
|
|
}
|
|
|
|
public static Timing getTickList(WorldServer worldserver, String timingsType) {
|
|
diff --git a/src/main/java/com/destroystokyo/paper/antixray/ChunkPacketBlockControllerAntiXray.java b/src/main/java/com/destroystokyo/paper/antixray/ChunkPacketBlockControllerAntiXray.java
|
|
index 9d8bee5cac..c2dd59f573 100644
|
|
--- a/src/main/java/com/destroystokyo/paper/antixray/ChunkPacketBlockControllerAntiXray.java
|
|
+++ b/src/main/java/com/destroystokyo/paper/antixray/ChunkPacketBlockControllerAntiXray.java
|
|
@@ -150,6 +150,12 @@ public class ChunkPacketBlockControllerAntiXray extends ChunkPacketBlockControll
|
|
|
|
private final AtomicInteger xrayRequests = new AtomicInteger();
|
|
|
|
+ // Paper start - async chunk api
|
|
+ private Integer nextTicketHold() {
|
|
+ return Integer.valueOf(this.xrayRequests.getAndIncrement());
|
|
+ }
|
|
+ // Paper end
|
|
+
|
|
private Integer addXrayTickets(final int x, final int z, final ChunkProviderServer chunkProvider) {
|
|
final Integer hold = Integer.valueOf(this.xrayRequests.getAndIncrement());
|
|
|
|
@@ -181,6 +187,33 @@ public class ChunkPacketBlockControllerAntiXray extends ChunkPacketBlockControll
|
|
chunk.world.getChunkAt(locX, locZ + 1);
|
|
}
|
|
|
|
+ // Paper start - async chunk api
|
|
+ private void loadNeighbourAsync(ChunkProviderServer chunkProvider, int chunkX, int chunkZ, int[] counter, java.util.function.Consumer<Chunk> onNeighourLoad, Runnable onAllNeighboursLoad) {
|
|
+ chunkProvider.getChunkAtAsynchronously(chunkX, chunkZ, true, (Chunk neighbour) -> {
|
|
+ onNeighourLoad.accept(neighbour);
|
|
+ if (++counter[0] == 4) {
|
|
+ onAllNeighboursLoad.run();
|
|
+ }
|
|
+ });
|
|
+ }
|
|
+
|
|
+ private void loadNeighboursAsync(Chunk chunk, java.util.function.Consumer<Chunk> onNeighourLoad, Runnable onAllNeighboursLoad) {
|
|
+ int[] loaded = new int[1];
|
|
+
|
|
+ int locX = chunk.getPos().x;
|
|
+ int locZ = chunk.getPos().z;
|
|
+
|
|
+ onNeighourLoad.accept(chunk);
|
|
+
|
|
+ ChunkProviderServer chunkProvider = ((WorldServer)chunk.world).getChunkProvider();
|
|
+
|
|
+ this.loadNeighbourAsync(chunkProvider, locX - 1, locZ, loaded, onNeighourLoad, onAllNeighboursLoad);
|
|
+ this.loadNeighbourAsync(chunkProvider, locX + 1, locZ, loaded, onNeighourLoad, onAllNeighboursLoad);
|
|
+ this.loadNeighbourAsync(chunkProvider, locX, locZ - 1, loaded, onNeighourLoad, onAllNeighboursLoad);
|
|
+ this.loadNeighbourAsync(chunkProvider, locX, locZ + 1, loaded, onNeighourLoad, onAllNeighboursLoad);
|
|
+ }
|
|
+ // Paper end
|
|
+
|
|
@Override
|
|
public boolean onChunkPacketCreate(Chunk chunk, int chunkSectionSelector, boolean force) {
|
|
int locX = chunk.getPos().x;
|
|
@@ -256,11 +289,15 @@ public class ChunkPacketBlockControllerAntiXray extends ChunkPacketBlockControll
|
|
|
|
if (chunks[0] == null || chunks[1] == null || chunks[2] == null || chunks[3] == null) {
|
|
// we need to load
|
|
- MinecraftServer.getServer().scheduleOnMain(() -> {
|
|
- Integer ticketHold = this.addXrayTickets(locX, locZ, world.getChunkProvider());
|
|
- this.loadNeighbours(chunk);
|
|
+ // Paper start - async chunk api
|
|
+ Integer ticketHold = this.nextTicketHold();
|
|
+ this.loadNeighboursAsync(chunk, (Chunk neighbour) -> { // when a neighbour is loaded
|
|
+ ((WorldServer)neighbour.world).getChunkProvider().addTicket(TicketType.ANTIXRAY, neighbour.getPos(), 0, ticketHold);
|
|
+ },
|
|
+ () -> { // once neighbours get loaded
|
|
this.modifyBlocks(packetPlayOutMapChunk, chunkPacketInfo, false, ticketHold);
|
|
});
|
|
+ // Paper end
|
|
return;
|
|
}
|
|
|
|
diff --git a/src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java b/src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java
|
|
new file mode 100644
|
|
index 0000000000..8cbe522f23
|
|
--- /dev/null
|
|
+++ b/src/main/java/com/destroystokyo/paper/io/ConcreteFileIOThread.java
|
|
@@ -0,0 +1,661 @@
|
|
+package com.destroystokyo.paper.io;
|
|
+
|
|
+import net.minecraft.server.ChunkCoordIntPair;
|
|
+import net.minecraft.server.ExceptionWorldConflict;
|
|
+import net.minecraft.server.MinecraftServer;
|
|
+import net.minecraft.server.NBTTagCompound;
|
|
+import net.minecraft.server.RegionFile;
|
|
+import net.minecraft.server.WorldServer;
|
|
+import org.apache.logging.log4j.Logger;
|
|
+
|
|
+import java.io.IOException;
|
|
+import java.util.concurrent.CompletableFuture;
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
+import java.util.function.Consumer;
|
|
+import java.util.function.Function;
|
|
+
|
|
+/**
|
|
+ * Prioritized singleton thread responsible for all chunk IO that occurs in a minecraft server.
|
|
+ *
|
|
+ * <p>
|
|
+ * Singleton access: {@link Holder#INSTANCE}
|
|
+ * </p>
|
|
+ *
|
|
+ * <p>
|
|
+ * All functions provided are MT-Safe, however certain ordering constraints are (but not enforced):
|
|
+ * <li>
|
|
+ * Chunk saves may not occur for unloaded chunks.
|
|
+ * </li>
|
|
+ * <li>
|
|
+ * Tasks must be scheduled on the main thread.
|
|
+ * </li>
|
|
+ * </p>
|
|
+ *
|
|
+ * @see Holder#INSTANCE
|
|
+ * @see #scheduleSave(WorldServer, int, int, NBTTagCompound, NBTTagCompound, int)
|
|
+ * @see #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean)
|
|
+ */
|
|
+public final class ConcreteFileIOThread extends QueueExecutorThread<PrioritizedTaskQueue.PrioritizedTask> {
|
|
+
|
|
+ public static final Logger LOGGER = MinecraftServer.LOGGER;
|
|
+ public static final NBTTagCompound FAILURE_VALUE = new NBTTagCompound();
|
|
+
|
|
+ public static final class Holder {
|
|
+
|
|
+ public static final ConcreteFileIOThread INSTANCE = new ConcreteFileIOThread();
|
|
+
|
|
+ static {
|
|
+ INSTANCE.start();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private final AtomicLong writeCounter = new AtomicLong();
|
|
+
|
|
+ private ConcreteFileIOThread() {
|
|
+ super(new PrioritizedTaskQueue<>(), (int)(1.0e6)); // 1.0ms spinwait time
|
|
+ this.setName("Concrete RegionFile IO Thread");
|
|
+ this.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us
|
|
+ this.setUncaughtExceptionHandler((final Thread unused, final Throwable thr) -> {
|
|
+ LOGGER.fatal("Uncaught exception thrown from IO thread, report this!", thr);
|
|
+ });
|
|
+ }
|
|
+
|
|
+ /* run() is implemented by superclass */
|
|
+
|
|
+ /*
|
|
+ *
|
|
+ * IO thread will perform reads before writes
|
|
+ *
|
|
+ * How reads/writes are scheduled:
|
|
+ *
|
|
+ * If read in progress while scheduling write, ignore read and schedule write
|
|
+ * If read in progress while scheduling read (no write in progress), chain the read task
|
|
+ *
|
|
+ *
|
|
+ * If write in progress while scheduling read, use the pending write data and ret immediately
|
|
+ * If write in progress while scheduling write (ignore read in progress), overwrite the write in progress data
|
|
+ *
|
|
+ * This allows the reads and writes to act as if they occur synchronously to the thread scheduling them, however
|
|
+ * it fails to properly propagate write failures
|
|
+ *
|
|
+ */
|
|
+
|
|
+ /**
|
|
+ * Attempts to bump the priority of all IO tasks for the given chunk coordinates. This has no effect if no tasks are queued.
|
|
+ * @param world Chunk's world
|
|
+ * @param chunkX Chunk's x coordinate
|
|
+ * @param chunkZ Chunk's z coordinate
|
|
+ * @param priority Priority level to try to bump to
|
|
+ */
|
|
+ public void bumpPriority(final WorldServer world, final int chunkX, final int chunkZ, final int priority) {
|
|
+ if (!PrioritizedTaskQueue.validPriority(priority)) {
|
|
+ throw new IllegalArgumentException("Invalid priority: " + priority);
|
|
+ }
|
|
+
|
|
+ final Long key = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ));
|
|
+
|
|
+ final ChunkDataTask poiTask = world.poiDataController.tasks.get(key);
|
|
+ final ChunkDataTask chunkTask = world.chunkDataController.tasks.get(key);
|
|
+
|
|
+ if (poiTask != null) {
|
|
+ poiTask.raisePriority(priority);
|
|
+ }
|
|
+ if (chunkTask != null) {
|
|
+ chunkTask.raisePriority(priority);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ // Hack start
|
|
+ /**
|
|
+ * if {@code waitForRead} is true, then this task will wait on an available read task, else it will wait on an available
|
|
+ * write task
|
|
+ * if {@code poiTask} is true, then this task will wait on a poi task, else it will wait on chunk data task
|
|
+ * @deprecated API is garbage and will only work for main thread queueing of tasks (which is vanilla), plugins messing
|
|
+ * around asynchronously will give unexpected results
|
|
+ * @return whether the task succeeded, or {@code null} if there is no task
|
|
+ */
|
|
+ @Deprecated
|
|
+ public Boolean waitForIOToComplete(final WorldServer world, final int chunkX, final int chunkZ, final boolean waitForRead,
|
|
+ final boolean poiTask) {
|
|
+ final ChunkDataTask task;
|
|
+
|
|
+ final Long key = IOUtil.getCoordinateKey(chunkX, chunkZ);
|
|
+ if (poiTask) {
|
|
+ task = world.poiDataController.tasks.get(key);
|
|
+ } else {
|
|
+ task = world.chunkDataController.tasks.get(key);
|
|
+ }
|
|
+
|
|
+ if (task == null) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ if (waitForRead) {
|
|
+ ChunkDataController.InProgressRead read = task.inProgressRead;
|
|
+ if (read == null) {
|
|
+ return null;
|
|
+ }
|
|
+ return Boolean.valueOf(read.readFuture.join() != null);
|
|
+ }
|
|
+
|
|
+ // wait for write
|
|
+ ChunkDataController.InProgressWrite write = task.inProgressWrite;
|
|
+ if (write == null) {
|
|
+ return null;
|
|
+ }
|
|
+ return Boolean.valueOf(write.wrote.join() != null);
|
|
+ }
|
|
+ // Hack end
|
|
+
|
|
+ public NBTTagCompound getPendingWrite(final WorldServer world, final int chunkX, final int chunkZ, final boolean poiData) {
|
|
+ final ChunkDataController taskController = poiData ? world.poiDataController : world.chunkDataController;
|
|
+
|
|
+ final ChunkDataTask dataTask = taskController.tasks.get(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)));
|
|
+
|
|
+ if (dataTask == null) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ final ChunkDataController.InProgressWrite write = dataTask.inProgressWrite;
|
|
+
|
|
+ if (write == null) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ return write.data;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Sets the priority of all IO tasks for the given chunk coordinates. This has no effect if no tasks are queued.
|
|
+ * @param world Chunk's world
|
|
+ * @param chunkX Chunk's x coordinate
|
|
+ * @param chunkZ Chunk's z coordinate
|
|
+ * @param priority Priority level to set to
|
|
+ */
|
|
+ public void setPriority(final WorldServer world, final int chunkX, final int chunkZ, final int priority) {
|
|
+ if (!PrioritizedTaskQueue.validPriority(priority)) {
|
|
+ throw new IllegalArgumentException("Invalid priority: " + priority);
|
|
+ }
|
|
+
|
|
+ final Long key = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ));
|
|
+
|
|
+ final ChunkDataTask poiTask = world.poiDataController.tasks.get(key);
|
|
+ final ChunkDataTask chunkTask = world.chunkDataController.tasks.get(key);
|
|
+
|
|
+ if (poiTask != null) {
|
|
+ poiTask.updatePriority(priority);
|
|
+ }
|
|
+ if (chunkTask != null) {
|
|
+ chunkTask.updatePriority(priority);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Schedules the chunk data to be written asynchronously.
|
|
+ * <p>
|
|
+ * Impl notes:
|
|
+ * </p>
|
|
+ * <li>
|
|
+ * This function presumes a chunk load for the coordinates is not called during this function (anytime after is OK). This means
|
|
+ * saves must be scheduled before a chunk is unloaded.
|
|
+ * </li>
|
|
+ * <li>
|
|
+ * Writes may be called concurrently, although only the "later" write will go through.
|
|
+ * </li>
|
|
+ * @param world Chunk's world
|
|
+ * @param chunkX Chunk's x coordinate
|
|
+ * @param chunkZ Chunk's z coordinate
|
|
+ * @param poiData Chunk point of interest data. If {@code null}, then no poi data is saved.
|
|
+ * @param chunkData Chunk data. If {@code null}, then no chunk data is saved.
|
|
+ * @param priority Priority level for this task. See {@link PrioritizedTaskQueue}
|
|
+ * @throws IllegalArgumentException If both {@code poiData} and {@code chunkData} are {@code null}.
|
|
+ * @throws IllegalStateException If the file io thread has shutdown.
|
|
+ */
|
|
+ public void scheduleSave(final WorldServer world, final int chunkX, final int chunkZ,
|
|
+ final NBTTagCompound poiData, final NBTTagCompound chunkData,
|
|
+ final int priority) throws IllegalArgumentException {
|
|
+ if (!PrioritizedTaskQueue.validPriority(priority)) {
|
|
+ throw new IllegalArgumentException("Invalid priority: " + priority);
|
|
+ }
|
|
+
|
|
+ final long writeCounter = this.writeCounter.getAndIncrement();
|
|
+
|
|
+ if (poiData != null) {
|
|
+ this.scheduleWrite(world.poiDataController, world, chunkX, chunkZ, poiData, priority, writeCounter);
|
|
+ }
|
|
+ if (chunkData != null) {
|
|
+ this.scheduleWrite(world.chunkDataController, world, chunkX, chunkZ, chunkData, priority, writeCounter);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void scheduleWrite(final ChunkDataController dataController, final WorldServer world,
|
|
+ final int chunkX, final int chunkZ, final NBTTagCompound data, final int priority, final long writeCounter) {
|
|
+ dataController.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkDataTask taskRunning) -> {
|
|
+ if (taskRunning == null) {
|
|
+ // no task is scheduled
|
|
+
|
|
+ // create task
|
|
+ final ChunkDataTask newTask = new ChunkDataTask(priority, world, chunkX, chunkZ, dataController);
|
|
+ newTask.inProgressWrite = new ChunkDataController.InProgressWrite();
|
|
+ newTask.inProgressWrite.writeCounter = writeCounter;
|
|
+ newTask.inProgressWrite.data = data;
|
|
+
|
|
+ ConcreteFileIOThread.this.queueTask(newTask); // schedule
|
|
+ return newTask;
|
|
+ }
|
|
+
|
|
+ taskRunning.raisePriority(priority);
|
|
+
|
|
+ if (taskRunning.inProgressWrite == null) {
|
|
+ taskRunning.inProgressWrite = new ChunkDataController.InProgressWrite();
|
|
+ }
|
|
+
|
|
+ boolean reschedule = taskRunning.inProgressWrite.writeCounter == -1L;
|
|
+
|
|
+ // synchronize for readers
|
|
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
|
|
+ synchronized (taskRunning) {
|
|
+ taskRunning.inProgressWrite.data = data;
|
|
+ taskRunning.inProgressWrite.writeCounter = writeCounter;
|
|
+ }
|
|
+
|
|
+ if (reschedule) {
|
|
+ // We need to reschedule this task since the previous one is not currently scheduled since it failed
|
|
+ taskRunning.reschedule(priority);
|
|
+ }
|
|
+
|
|
+ return taskRunning;
|
|
+ });
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Same as {@link #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean)}, except this function returns
|
|
+ * a {@link CompletableFuture} which is potentially completed <b>ASYNCHRONOUSLY ON THE FILE IO THREAD</b> when the load task
|
|
+ * has completed.
|
|
+ * <p>
|
|
+ * Note that if the chunk fails to load the returned future is completed with {@code null}.
|
|
+ * </p>
|
|
+ */
|
|
+ public CompletableFuture<ChunkData> loadChunkDataAsyncFuture(final WorldServer world, final int chunkX, final int chunkZ,
|
|
+ final int priority, final boolean readPoiData, final boolean readChunkData,
|
|
+ final boolean intendingToBlock) {
|
|
+ final CompletableFuture<ChunkData> future = new CompletableFuture<>();
|
|
+ this.loadChunkDataAsync(world, chunkX, chunkZ, priority, future::complete, readPoiData, readChunkData, intendingToBlock);
|
|
+ return future;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Schedules a load to be executed asynchronously.
|
|
+ * <p>
|
|
+ * Impl notes:
|
|
+ * </p>
|
|
+ * <li>
|
|
+ * If a chunk fails to load, the {@code onComplete} parameter is completed with {@code null}.
|
|
+ * </li>
|
|
+ * <li>
|
|
+ * It is possible for the {@code onComplete} parameter to be given {@link ChunkData} containing data
|
|
+ * this call did not request.
|
|
+ * </li>
|
|
+ * <li>
|
|
+ * The {@code onComplete} parameter may be completed during the execution of this function synchronously or it may
|
|
+ * be completed asynchronously on this file io thread. Interacting with the file IO thread in the completion of
|
|
+ * data is undefined behaviour, and can cause deadlock.
|
|
+ * </li>
|
|
+ * @param world Chunk's world
|
|
+ * @param chunkX Chunk's x coordinate
|
|
+ * @param chunkZ Chunk's z coordinate
|
|
+ * @param priority Priority level for this task. See {@link PrioritizedTaskQueue}
|
|
+ * @param onComplete Consumer to execute once this task has completed
|
|
+ * @param readPoiData Whether to read point of interest data. If {@code false}, the {@code NBTTagCompound} will be {@code null}.
|
|
+ * @param readChunkData Whether to read chunk data. If {@code false}, the {@code NBTTagCompound} will be {@code null}.
|
|
+ * @return The {@link PrioritizedTaskQueue.PrioritizedTask} associated with this task. Note that this task does not support
|
|
+ * cancellation.
|
|
+ */
|
|
+ public void loadChunkDataAsync(final WorldServer world, final int chunkX, final int chunkZ,
|
|
+ final int priority, final Consumer<ChunkData> onComplete,
|
|
+ final boolean readPoiData, final boolean readChunkData,
|
|
+ final boolean intendingToBlock) {
|
|
+ if (!PrioritizedTaskQueue.validPriority(priority)) {
|
|
+ throw new IllegalArgumentException("Invalid priority: " + priority);
|
|
+ }
|
|
+
|
|
+ if (!(readPoiData | readChunkData)) {
|
|
+ throw new IllegalArgumentException("Must read chunk data or poi data");
|
|
+ }
|
|
+
|
|
+ final ChunkData complete = new ChunkData();
|
|
+ final boolean[] requireCompletion = new boolean[] { readPoiData, readChunkData };
|
|
+
|
|
+ if (readPoiData) {
|
|
+ this.scheduleRead(world.poiDataController, world, chunkX, chunkZ, (final NBTTagCompound poiData) -> {
|
|
+ complete.poiData = poiData;
|
|
+
|
|
+ final boolean finished;
|
|
+
|
|
+ // avoid a race condition where the file io thread completes and we complete synchronously
|
|
+ // Note: Synchronization can be elided if both of the accesses are volatile
|
|
+ synchronized (requireCompletion) {
|
|
+ requireCompletion[0] = false; // 0 -> poi data
|
|
+ finished = !requireCompletion[1]; // 1 -> chunk data
|
|
+ }
|
|
+
|
|
+ if (finished) {
|
|
+ onComplete.accept(complete);
|
|
+ }
|
|
+ }, priority, intendingToBlock);
|
|
+ }
|
|
+
|
|
+ if (readChunkData) {
|
|
+ this.scheduleRead(world.chunkDataController, world, chunkX, chunkZ, (final NBTTagCompound chunkData) -> {
|
|
+ complete.chunkData = chunkData;
|
|
+
|
|
+ final boolean finished;
|
|
+
|
|
+ // avoid a race condition where the file io thread completes and we complete synchronously
|
|
+ // Note: Synchronization can be elided if both of the accesses are volatile
|
|
+ synchronized (requireCompletion) {
|
|
+ requireCompletion[1] = false; // 1 -> chunk data
|
|
+ finished = !requireCompletion[0]; // 0 -> poi data
|
|
+ }
|
|
+
|
|
+ if (finished) {
|
|
+ onComplete.accept(complete);
|
|
+ }
|
|
+ }, priority, intendingToBlock);
|
|
+ }
|
|
+
|
|
+ }
|
|
+
|
|
+ // Note: the onComplete may be called asynchronously or synchronously here.
|
|
+ private void scheduleRead(final ChunkDataController dataController, final WorldServer world,
|
|
+ final int chunkX, final int chunkZ, final Consumer<NBTTagCompound> onComplete, final int priority,
|
|
+ final boolean intendingToBlock) {
|
|
+
|
|
+ Function<RegionFile, Boolean> tryLoadFunction = (final RegionFile file) -> {
|
|
+ if (file == null) {
|
|
+ return Boolean.TRUE;
|
|
+ }
|
|
+ return Boolean.valueOf(file.chunkExists(new ChunkCoordIntPair(chunkX, chunkZ)));
|
|
+ };
|
|
+
|
|
+ dataController.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkDataTask running) -> {
|
|
+ if (running == null) {
|
|
+ // not scheduled
|
|
+
|
|
+ final Boolean shouldSchedule = intendingToBlock ? dataController.computeForRegionFile(chunkX, chunkZ, tryLoadFunction) :
|
|
+ dataController.computeForRegionFileIfLoaded(chunkX, chunkZ, tryLoadFunction);
|
|
+
|
|
+ if (shouldSchedule == Boolean.FALSE) {
|
|
+ // not on disk
|
|
+ onComplete.accept(null);
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ // set up task
|
|
+ final ChunkDataTask newTask = new ChunkDataTask(priority, world, chunkX, chunkZ, dataController);
|
|
+ newTask.inProgressRead = new ChunkDataController.InProgressRead();
|
|
+ newTask.inProgressRead.readFuture.thenAccept(onComplete);
|
|
+
|
|
+ ConcreteFileIOThread.this.queueTask(newTask); // schedule task
|
|
+ return newTask;
|
|
+ }
|
|
+
|
|
+ running.raisePriority(priority);
|
|
+
|
|
+ if (running.inProgressWrite == null) {
|
|
+ // chain to the read future
|
|
+ running.inProgressRead.readFuture.thenAccept(onComplete);
|
|
+ return running;
|
|
+ }
|
|
+
|
|
+ // at this stage we have to use the in progress write's data to avoid an order issue
|
|
+ // we don't synchronize since all writes to data occur in the compute() call
|
|
+ onComplete.accept(running.inProgressWrite.data);
|
|
+ return running;
|
|
+ });
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Same as {@link #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean)}, except this function returns
|
|
+ * the {@link ChunkData} associated with the specified chunk when the task is complete.
|
|
+ * @return The chunk data, or {@code null} if the chunk failed to load.
|
|
+ */
|
|
+ public ChunkData loadChunkData(final WorldServer world, final int chunkX, final int chunkZ, final int priority,
|
|
+ final boolean readPoiData, final boolean readChunkData) {
|
|
+ return this.loadChunkDataAsyncFuture(world, chunkX, chunkZ, priority, readPoiData, readChunkData, true).join();
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Schedules the given task at the specified priority to be executed on the IO thread.
|
|
+ * <p>
|
|
+ * Internal api. Do not use.
|
|
+ * </p>
|
|
+ */
|
|
+ public void runTask(final int priority, final Runnable runnable) {
|
|
+ this.queueTask(new GeneralTask(priority, runnable));
|
|
+ }
|
|
+
|
|
+ static final class GeneralTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable {
|
|
+
|
|
+ private final Runnable run;
|
|
+
|
|
+ public GeneralTask(final int priority, final Runnable run) {
|
|
+ super(priority);
|
|
+ this.run = IOUtil.notNull(run, "Task may not be null");
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void run() {
|
|
+ try {
|
|
+ this.run.run();
|
|
+ } catch (final Throwable throwable) {
|
|
+ if (throwable instanceof ThreadDeath) {
|
|
+ throw (ThreadDeath)throwable;
|
|
+ }
|
|
+ LOGGER.fatal("Failed to execute general task on IO thread " + IOUtil.genericToString(this.run), throwable);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public static final class ChunkData {
|
|
+
|
|
+ public NBTTagCompound poiData;
|
|
+ public NBTTagCompound chunkData;
|
|
+
|
|
+ public ChunkData() {}
|
|
+
|
|
+ public ChunkData(final NBTTagCompound poiData, final NBTTagCompound chunkData) {
|
|
+ this.poiData = poiData;
|
|
+ this.chunkData = chunkData;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public static abstract class ChunkDataController {
|
|
+
|
|
+ // ConcurrentHashMap synchronizes per chain, so reduce the chance of task's hashes colliding.
|
|
+ public final ConcurrentHashMap<Long, ChunkDataTask> tasks = new ConcurrentHashMap<>(64, 0.5f);
|
|
+
|
|
+ public abstract void writeData(final int x, final int z, final NBTTagCompound compound) throws IOException;
|
|
+ public abstract NBTTagCompound readData(final int x, final int z) throws IOException;
|
|
+
|
|
+ public abstract <T> T computeForRegionFile(final int chunkX, final int chunkZ, final Function<RegionFile, T> function);
|
|
+ public abstract <T> T computeForRegionFileIfLoaded(final int chunkX, final int chunkZ, final Function<RegionFile, T> function);
|
|
+
|
|
+ public static final class InProgressWrite {
|
|
+ public long writeCounter;
|
|
+ public NBTTagCompound data;
|
|
+
|
|
+ // Hack start
|
|
+ @Deprecated
|
|
+ public CompletableFuture<NBTTagCompound> wrote = new CompletableFuture<>();
|
|
+ // Hack end
|
|
+ }
|
|
+
|
|
+ public static final class InProgressRead {
|
|
+ public final CompletableFuture<NBTTagCompound> readFuture = new CompletableFuture<>();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public static final class ChunkDataTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable {
|
|
+
|
|
+ public ChunkDataController.InProgressWrite inProgressWrite;
|
|
+ public ChunkDataController.InProgressRead inProgressRead;
|
|
+
|
|
+ private final WorldServer world;
|
|
+ private final int x;
|
|
+ private final int z;
|
|
+ private final ChunkDataController taskController;
|
|
+
|
|
+ public ChunkDataTask(final int priority, final WorldServer world, final int x, final int z, final ChunkDataController taskController) {
|
|
+ super(priority);
|
|
+ this.world = world;
|
|
+ this.x = x;
|
|
+ this.z = z;
|
|
+ this.taskController = taskController;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public String toString() {
|
|
+ return "Task for world " + this.world.getWorld().getName() + " at " + this.x + ", " + this.z + " poi: " + (this.taskController == this.world.poiDataController);
|
|
+ }
|
|
+
|
|
+ /*
|
|
+ *
|
|
+ * IO thread will perform reads before writes
|
|
+ *
|
|
+ * How reads/writes are scheduled:
|
|
+ *
|
|
+ * If read in progress while scheduling write, ignore read and schedule write
|
|
+ * If read in progress while scheduling read (no write in progress), chain the read task
|
|
+ *
|
|
+ *
|
|
+ * If write in progress while scheduling read, use the pending write data and ret immediately
|
|
+ * If write in progress while scheduling write (ignore read in progress), overwrite the write in progress data
|
|
+ *
|
|
+ * This allows the reads and writes to act as if they occur synchronously to the thread scheduling them, however
|
|
+ * it fails to properly propagate write failures
|
|
+ *
|
|
+ */
|
|
+
|
|
+ void reschedule(final int priority) {
|
|
+ // priority is checked before this stage
|
|
+ this.queue.lazySet(null);
|
|
+ this.inProgressWrite.wrote = new CompletableFuture<>(); // Hack
|
|
+ this.priority.lazySet(priority);
|
|
+ ConcreteFileIOThread.Holder.INSTANCE.queueTask(this);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void run() {
|
|
+ ChunkDataController.InProgressRead read = this.inProgressRead;
|
|
+ if (read != null) {
|
|
+ NBTTagCompound compound = ConcreteFileIOThread.FAILURE_VALUE;
|
|
+ try {
|
|
+ compound = this.taskController.readData(this.x, this.z);
|
|
+ } catch (final Throwable thr) {
|
|
+ if (thr instanceof ThreadDeath) {
|
|
+ throw (ThreadDeath)thr;
|
|
+ }
|
|
+ LOGGER.fatal("Failed to read chunk data for (" + this.x + "," + this.z + ")", thr);
|
|
+ // fall through to complete with null data
|
|
+ }
|
|
+ read.readFuture.complete(compound);
|
|
+ }
|
|
+
|
|
+ final Long chunkKey = Long.valueOf(IOUtil.getCoordinateKey(this.x, this.z));
|
|
+
|
|
+ ChunkDataController.InProgressWrite write = this.inProgressWrite;
|
|
+
|
|
+ if (write == null) {
|
|
+ // IntelliJ warns this is invalid, however it does not consider that writes to the task map & the inProgress field can occur concurrently.
|
|
+ ChunkDataTask inMap = this.taskController.tasks.compute(chunkKey, (final Long keyInMap, final ChunkDataTask valueInMap) -> {
|
|
+ if (valueInMap == null) {
|
|
+ throw new IllegalStateException("Write completed concurrently, report this");
|
|
+ }
|
|
+ if (valueInMap != ChunkDataTask.this) {
|
|
+ throw new IllegalStateException("Chunk task mismatch, report this");
|
|
+ }
|
|
+ return valueInMap.inProgressWrite == null ? null : valueInMap;
|
|
+ });
|
|
+
|
|
+ if (inMap == null) {
|
|
+ return; // set the task value to null, indicating we're done
|
|
+ }
|
|
+
|
|
+ // not null, which means there was a concurrent write
|
|
+ write = this.inProgressWrite;
|
|
+ }
|
|
+
|
|
+ // check if another process is writing
|
|
+ try {
|
|
+ this.world.checkSession();
|
|
+ } catch (final ExceptionWorldConflict ex) {
|
|
+ LOGGER.fatal("Couldn't save chunk; already in use by another instance of Minecraft?", ex);
|
|
+ // we don't need to set the write counter to -1 as we know at this stage there's no point in re-scheduling
|
|
+ // writes since they'll fail anyways.
|
|
+ write.wrote.complete(ConcreteFileIOThread.FAILURE_VALUE); // Hack - However we need to fail the write
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ for (;;) {
|
|
+ final long writeCounter;
|
|
+ final NBTTagCompound data;
|
|
+
|
|
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
|
|
+ synchronized (write) {
|
|
+ writeCounter = write.writeCounter;
|
|
+ data = write.data;
|
|
+ }
|
|
+
|
|
+ boolean failedWrite = false;
|
|
+
|
|
+ try {
|
|
+ this.taskController.writeData(this.x, this.z, data);
|
|
+ } catch (final Throwable thr) {
|
|
+ if (thr instanceof ThreadDeath) {
|
|
+ throw (ThreadDeath)thr;
|
|
+ }
|
|
+ LOGGER.fatal("Failed to write chunk data for (" + this.x + "," + this.z + ")", thr);
|
|
+ failedWrite = true;
|
|
+ }
|
|
+
|
|
+ boolean finalFailWrite = failedWrite;
|
|
+ boolean[] returnFailWrite = new boolean[] { false };
|
|
+
|
|
+ ChunkDataTask inMap = this.taskController.tasks.compute(chunkKey, (final Long keyInMap, final ChunkDataTask valueInMap) -> {
|
|
+ if (valueInMap == null) {
|
|
+ throw new IllegalStateException("Write completed concurrently, report this");
|
|
+ }
|
|
+ if (valueInMap != ChunkDataTask.this) {
|
|
+ throw new IllegalStateException("Chunk task mismatch, report this");
|
|
+ }
|
|
+ if (finalFailWrite) {
|
|
+ if (valueInMap.inProgressWrite.writeCounter == writeCounter) {
|
|
+ valueInMap.inProgressWrite.writeCounter = -1L;
|
|
+ returnFailWrite[0] = true;
|
|
+ }
|
|
+ // Hack start
|
|
+ valueInMap.inProgressWrite.wrote.complete(ConcreteFileIOThread.FAILURE_VALUE);
|
|
+ return valueInMap;
|
|
+ }
|
|
+ if (valueInMap.inProgressWrite.writeCounter == writeCounter) {
|
|
+ valueInMap.inProgressWrite.wrote.complete(data);
|
|
+ return null;
|
|
+ }
|
|
+ return valueInMap;
|
|
+ // Hack end
|
|
+ });
|
|
+
|
|
+ if (inMap == null || returnFailWrite[0]) {
|
|
+ // write counter matched, so we wrote the most up-to-date pending data, we're done here
|
|
+ // or we failed to write and successfully set the write counter to -1
|
|
+ return; // we're done here
|
|
+ }
|
|
+
|
|
+ // fetch & write new data
|
|
+ continue;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/com/destroystokyo/paper/io/IOUtil.java b/src/main/java/com/destroystokyo/paper/io/IOUtil.java
|
|
new file mode 100644
|
|
index 0000000000..5af0ac3d9e
|
|
--- /dev/null
|
|
+++ b/src/main/java/com/destroystokyo/paper/io/IOUtil.java
|
|
@@ -0,0 +1,62 @@
|
|
+package com.destroystokyo.paper.io;
|
|
+
|
|
+import org.bukkit.Bukkit;
|
|
+
|
|
+public final class IOUtil {
|
|
+
|
|
+ /* Copied from concrete or concurrentutil */
|
|
+
|
|
+ public static long getCoordinateKey(final int x, final int z) {
|
|
+ return ((long)z << 32) | (x & 0xFFFFFFFFL);
|
|
+ }
|
|
+
|
|
+ public static int getCoordinateX(final long key) {
|
|
+ return (int)key;
|
|
+ }
|
|
+
|
|
+ public static int getCoordinateZ(final long key) {
|
|
+ return (int)(key >>> 32);
|
|
+ }
|
|
+
|
|
+ public static int getRegionCoordinate(final int chunkCoordinate) {
|
|
+ return chunkCoordinate >> 5;
|
|
+ }
|
|
+
|
|
+ public static int getChunkInRegion(final int chunkCoordinate) {
|
|
+ return chunkCoordinate & 31;
|
|
+ }
|
|
+
|
|
+ public static String genericToString(final Object object) {
|
|
+ return object == null ? "null" : object.getClass().getName() + ":" + object.toString();
|
|
+ }
|
|
+
|
|
+ public static <T> T notNull(final T obj) {
|
|
+ if (obj == null) {
|
|
+ throw new NullPointerException();
|
|
+ }
|
|
+ return obj;
|
|
+ }
|
|
+
|
|
+ public static <T> T notNull(final T obj, final String msgIfNull) {
|
|
+ if (obj == null) {
|
|
+ throw new NullPointerException(msgIfNull);
|
|
+ }
|
|
+ return obj;
|
|
+ }
|
|
+
|
|
+ public static void arrayBounds(final int off, final int len, final int arrayLength, final String msgPrefix) {
|
|
+ if (off < 0 || len < 0 || (arrayLength - off) < len) {
|
|
+ throw new ArrayIndexOutOfBoundsException(msgPrefix + ": off: " + off + ", len: " + len + ", array length: " + arrayLength);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public static int getPriorityForCurrentThread() {
|
|
+ return Bukkit.isPrimaryThread() ? PrioritizedTaskQueue.HIGHEST_PRIORITY : PrioritizedTaskQueue.NORMAL_PRIORITY;
|
|
+ }
|
|
+
|
|
+ @SuppressWarnings("unchecked")
|
|
+ public static <T extends Throwable> void rethrow(final Throwable throwable) throws T {
|
|
+ throw (T)throwable;
|
|
+ }
|
|
+
|
|
+}
|
|
diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
|
|
new file mode 100644
|
|
index 0000000000..c3ca3c4a1c
|
|
--- /dev/null
|
|
+++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
|
|
@@ -0,0 +1,258 @@
|
|
+package com.destroystokyo.paper.io;
|
|
+
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
+
|
|
+public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask> {
|
|
+
|
|
+ // lower numbers are a higher priority (except < 0)
|
|
+ // higher priorities are always executed before lower priorities
|
|
+
|
|
+ /**
|
|
+ * Priority value indicating the task has completed or is being completed.
|
|
+ */
|
|
+ public static final int COMPLETING_PRIORITY = -1;
|
|
+
|
|
+ /**
|
|
+ * Highest priority, should only be used for main thread tasks or tasks that are blocking the main thread.
|
|
+ */
|
|
+ public static final int HIGHEST_PRIORITY = 0;
|
|
+
|
|
+ /**
|
|
+ * Should be only used in an IO task so that chunk loads do not wait on other IO tasks.
|
|
+ * This only exists because IO tasks are scheduled before chunk load tasks to decrease IO waiting times.
|
|
+ */
|
|
+ public static final int HIGHER_PRIORITY = 1;
|
|
+
|
|
+ /**
|
|
+ * Should be used for scheduling chunk loads/generation that would increase response times to users.
|
|
+ */
|
|
+ public static final int HIGH_PRIORITY = 2;
|
|
+
|
|
+ /**
|
|
+ * Default priority.
|
|
+ */
|
|
+ public static final int NORMAL_PRIORITY = 3;
|
|
+
|
|
+ /**
|
|
+ * Use for tasks not at all critical and can potentially be delayed.
|
|
+ */
|
|
+ public static final int LOW_PRIORITY = 4;
|
|
+
|
|
+ /**
|
|
+ * Use for tasks that should "eventually" execute.
|
|
+ */
|
|
+ public static final int LOWEST_PRIORITY = 5;
|
|
+
|
|
+ private static final int TOTAL_PRIORITIES = 6;
|
|
+
|
|
+ final ConcurrentLinkedQueue<T>[] queues = (ConcurrentLinkedQueue<T>[])new ConcurrentLinkedQueue[TOTAL_PRIORITIES];
|
|
+
|
|
+ private final AtomicBoolean shutdown = new AtomicBoolean();
|
|
+
|
|
+ {
|
|
+ for (int i = 0; i < TOTAL_PRIORITIES; ++i) {
|
|
+ this.queues[i] = new ConcurrentLinkedQueue<>();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Returns whether the specified priority is valid
|
|
+ */
|
|
+ public static boolean validPriority(final int priority) {
|
|
+ return priority >= 0 && priority < TOTAL_PRIORITIES;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Queues a task.
|
|
+ * @throws IllegalStateException If the task has already been queued. Use {@link PrioritizedTask#raisePriority(int)} to
|
|
+ * raise a task's priority.
|
|
+ * This can also be thrown if the queue has shutdown.
|
|
+ */
|
|
+ public void add(final T task) throws IllegalStateException {
|
|
+ task.onQueue(this);
|
|
+ this.queues[task.getPriority()].add(task);
|
|
+ if (this.shutdown.get()) {
|
|
+ // note: we're not actually sure at this point if our task will go through
|
|
+ throw new IllegalStateException("Queue has shutdown, refusing to execute task " + IOUtil.genericToString(task));
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Polls the highest priority task currently available. {@code null} if none.
|
|
+ */
|
|
+ public T poll() {
|
|
+ T task;
|
|
+ for (int i = 0; i < TOTAL_PRIORITIES; ++i) {
|
|
+ final ConcurrentLinkedQueue<T> queue = this.queues[i];
|
|
+
|
|
+ while ((task = queue.poll()) != null) {
|
|
+ final int prevPriority = task.tryComplete(i);
|
|
+ if (prevPriority != COMPLETING_PRIORITY && prevPriority <= i) {
|
|
+ // if the prev priority was greater-than or equal to our current priority
|
|
+ return task;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Prevent further additions to this queue. Attempts to add after this call has completed (potentially during) will
|
|
+ * result in {@link IllegalStateException} being thrown.
|
|
+ * <p>
|
|
+ * This operation is atomic with respect to other shutdown calls
|
|
+ * </p>
|
|
+ * <p>
|
|
+ * After this call has completed, regardless of return value, this queue will be shutdown.
|
|
+ * </p>
|
|
+ * @return {@code true} if the queue was shutdown, {@code false} if it has shut down already
|
|
+ */
|
|
+ public boolean shutdown() {
|
|
+ return this.shutdown.getAndSet(false);
|
|
+ }
|
|
+
|
|
+ public abstract static class PrioritizedTask {
|
|
+
|
|
+ protected final AtomicReference<PrioritizedTaskQueue> queue = new AtomicReference<>();
|
|
+
|
|
+ protected final AtomicInteger priority;
|
|
+
|
|
+ protected PrioritizedTask() {
|
|
+ this(PrioritizedTaskQueue.NORMAL_PRIORITY);
|
|
+ }
|
|
+
|
|
+ protected PrioritizedTask(final int priority) {
|
|
+ if (!PrioritizedTaskQueue.validPriority(priority)) {
|
|
+ throw new IllegalArgumentException("Invalid priority " + priority);
|
|
+ }
|
|
+ this.priority = new AtomicInteger(priority);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Returns the current priority. Note that {@link PrioritizedTaskQueue#COMPLETING_PRIORITY} will be returned
|
|
+ * if this task is completing or has completed.
|
|
+ */
|
|
+ public final int getPriority() {
|
|
+ return this.priority.get();
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Returns whether this task is scheduled to execute, or has been already executed.
|
|
+ */
|
|
+ public boolean isScheduled() {
|
|
+ return this.queue.get() != null;
|
|
+ }
|
|
+
|
|
+ final int tryComplete(final int minPriority) {
|
|
+ for (int curr = this.getPriorityVolatile();;) {
|
|
+ if (curr == COMPLETING_PRIORITY) {
|
|
+ return COMPLETING_PRIORITY;
|
|
+ }
|
|
+ if (curr > minPriority) {
|
|
+ // curr is lower priority
|
|
+ return curr;
|
|
+ }
|
|
+
|
|
+ if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, COMPLETING_PRIORITY))) {
|
|
+ return curr;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Forces this task to be completed.
|
|
+ * @return {@code true} if the task was cancelled, {@code false} if the task has already completed or is being completed.
|
|
+ */
|
|
+ public boolean cancel() {
|
|
+ return this.exchangePriorityVolatile(PrioritizedTaskQueue.COMPLETING_PRIORITY) != PrioritizedTaskQueue.COMPLETING_PRIORITY;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Attempts to raise the priority to the priority level specified.
|
|
+ * @param priority Priority specified
|
|
+ * @return {@code true} if successful, {@code false} otherwise.
|
|
+ */
|
|
+ public boolean raisePriority(final int priority) {
|
|
+ if (!PrioritizedTaskQueue.validPriority(priority)) {
|
|
+ throw new IllegalArgumentException("Invalid priority");
|
|
+ }
|
|
+
|
|
+ for (int curr = this.getPriorityVolatile();;) {
|
|
+ if (curr == COMPLETING_PRIORITY) {
|
|
+ return false;
|
|
+ }
|
|
+ if (priority >= curr) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, priority))) {
|
|
+ PrioritizedTaskQueue queue = this.queue.get();
|
|
+ if (queue != null) {
|
|
+ //noinspection unchecked
|
|
+ queue.queues[priority].add(this); // silently fail on shutdown
|
|
+ }
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Attempts to set this task's priority level to the level specified.
|
|
+ * @param priority Specified priority level.
|
|
+ * @return {@code true} if successful, {@code false} if this task is completing or has completed.
|
|
+ */
|
|
+ public boolean updatePriority(final int priority) {
|
|
+ if (!PrioritizedTaskQueue.validPriority(priority)) {
|
|
+ throw new IllegalArgumentException("Invalid priority");
|
|
+ }
|
|
+
|
|
+ for (int curr = this.getPriorityVolatile();;) {
|
|
+ if (curr == COMPLETING_PRIORITY) {
|
|
+ return false;
|
|
+ }
|
|
+ if (curr == priority) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ if (curr == (curr = this.compareAndExchangePriorityVolatile(curr, priority))) {
|
|
+ PrioritizedTaskQueue queue = this.queue.get();
|
|
+ if (queue != null) {
|
|
+ //noinspection unchecked
|
|
+ queue.queues[priority].add(this); // silently fail on shutdown
|
|
+ }
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ void onQueue(final PrioritizedTaskQueue queue) {
|
|
+ if (this.queue.getAndSet(queue) != null) {
|
|
+ throw new IllegalStateException("Already queued!");
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /* priority */
|
|
+
|
|
+ protected final int getPriorityVolatile() {
|
|
+ return this.priority.get();
|
|
+ }
|
|
+
|
|
+ protected final int compareAndExchangePriorityVolatile(final int expect, final int update) {
|
|
+ if (this.priority.compareAndSet(expect, update)) {
|
|
+ return expect;
|
|
+ }
|
|
+ return this.priority.get();
|
|
+ }
|
|
+
|
|
+ protected final int exchangePriorityVolatile(final int value) {
|
|
+ return this.priority.getAndSet(value);
|
|
+ }
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
|
|
new file mode 100644
|
|
index 0000000000..609b2038f2
|
|
--- /dev/null
|
|
+++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
|
|
@@ -0,0 +1,216 @@
|
|
+package com.destroystokyo.paper.io;
|
|
+
|
|
+import net.minecraft.server.MinecraftServer;
|
|
+import org.apache.logging.log4j.Logger;
|
|
+
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
+import java.util.concurrent.locks.LockSupport;
|
|
+
|
|
+public class QueueExecutorThread<T extends PrioritizedTaskQueue.PrioritizedTask> extends Thread {
|
|
+
|
|
+ private static final Logger LOGGER = MinecraftServer.LOGGER;
|
|
+
|
|
+ protected final PrioritizedTaskQueue<T> queue;
|
|
+ protected final long spinWaitTime;
|
|
+
|
|
+ protected volatile boolean closed;
|
|
+ protected final AtomicBoolean parked = new AtomicBoolean();
|
|
+
|
|
+ protected final ConcurrentLinkedQueue<Thread> flushQueue = new ConcurrentLinkedQueue<>();
|
|
+ protected volatile int flushCounter;
|
|
+
|
|
+ public QueueExecutorThread(final PrioritizedTaskQueue<T> queue) {
|
|
+ this(queue, (int)(1.e6)); // 1.0ms
|
|
+ }
|
|
+
|
|
+ public QueueExecutorThread(final PrioritizedTaskQueue<T> queue, final long spinWaitTime) { // in ms
|
|
+ this.queue = queue;
|
|
+ this.spinWaitTime = spinWaitTime;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void run() {
|
|
+ final long spinWaitTime = this.spinWaitTime;
|
|
+ main_loop:
|
|
+ for (;;) {
|
|
+ this.pollTasks();
|
|
+
|
|
+ // spinwait
|
|
+
|
|
+ final long start = System.nanoTime();
|
|
+
|
|
+ for (;;) {
|
|
+ // If we are interrpted for any reason, park() will always return immediately. Clear so that we don't needlessly use cpu in such an event.
|
|
+ Thread.interrupted();
|
|
+ LockSupport.parkNanos("Spinwaiting on tasks", 1000L); // 1us
|
|
+
|
|
+ if (this.pollTasks()) {
|
|
+ continue main_loop;
|
|
+ }
|
|
+
|
|
+ if (this.handleClose()) {
|
|
+ return; // we're done
|
|
+ }
|
|
+
|
|
+ if ((System.nanoTime() - start) >= spinWaitTime) {
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if (this.handleClose()) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ this.parked.set(true);
|
|
+ // We need to parse here to avoid a race condition where a thread queues a task before we set parked to true
|
|
+ if (this.pollTasks()) {
|
|
+ this.parked.set(false);
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ do {
|
|
+ Thread.interrupted();
|
|
+ LockSupport.park("Waiting on tasks");
|
|
+ } while (this.parked.get());
|
|
+ }
|
|
+ }
|
|
+
|
|
+ protected boolean handleClose() {
|
|
+ if (this.closed) {
|
|
+ // at this stage no task may be queued
|
|
+ this.pollTasks(); // this ensures we've emptied the queue
|
|
+ this.handleFlushThreads(true);
|
|
+ return true;
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ protected boolean pollTasks() {
|
|
+ Runnable task;
|
|
+ boolean ret = false;
|
|
+
|
|
+ while ((task = (Runnable)this.queue.poll()) != null) {
|
|
+ ret = true;
|
|
+ try {
|
|
+ task.run();
|
|
+ } catch (final Throwable throwable) {
|
|
+ if (throwable instanceof ThreadDeath) {
|
|
+ throw (ThreadDeath)throwable;
|
|
+ }
|
|
+ LOGGER.fatal("Exception thrown from prioritized runnable task in thread " + this.getName() + ": " + IOUtil.genericToString(task), throwable);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ this.handleFlushThreads(false);
|
|
+
|
|
+ return ret;
|
|
+ }
|
|
+
|
|
+ protected void handleFlushThreads(final boolean shutdown) {
|
|
+ //noinspection NonAtomicOperationOnVolatileField
|
|
+ ++this.flushCounter;
|
|
+
|
|
+ Thread current;
|
|
+
|
|
+ while ((current = this.flushQueue.poll()) != null) {
|
|
+ LockSupport.unpark(current);
|
|
+ }
|
|
+
|
|
+ }
|
|
+
|
|
+ public boolean isWaitingForTasks() {
|
|
+ return this.parked.get();
|
|
+ }
|
|
+
|
|
+ public boolean notifyTasks() {
|
|
+ if (this.parked.get() && this.parked.getAndSet(false)) {
|
|
+ LockSupport.unpark(this);
|
|
+ return true;
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ public void onTaskQueue() {
|
|
+ if (this.parked.get() && this.parked.getAndSet(false)) {
|
|
+ LockSupport.unpark(this);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ protected void queueTask(final T task) {
|
|
+ this.queue.add(task);
|
|
+ this.onTaskQueue();
|
|
+ }
|
|
+
|
|
+
|
|
+ /**
|
|
+ * Waits until this thread's queue is empty.
|
|
+ *
|
|
+ * @throws IllegalStateException If the current thread is {@code this} thread.
|
|
+ */
|
|
+ public void flush() {
|
|
+ final Thread currentThread = Thread.currentThread();
|
|
+
|
|
+ if (currentThread == this) {
|
|
+ throw new IllegalStateException("Cannot flush the queue executor thread while on the queue executor thread");
|
|
+ }
|
|
+
|
|
+ // order is important
|
|
+
|
|
+ if (this.closed) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ if (this.parked.get()) {
|
|
+ return; // no tasks queued
|
|
+ }
|
|
+
|
|
+ int flushCounter = this.flushCounter;
|
|
+
|
|
+ this.flushQueue.add(currentThread);
|
|
+
|
|
+ if (this.closed) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ // re-check parked status
|
|
+ if (this.parked.get()) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ // force a response from the IO thread, we're not sure of its state currently
|
|
+ this.parked.set(false);
|
|
+ LockSupport.unpark(this);
|
|
+
|
|
+ boolean interrupted = false; // preserve interrupted status
|
|
+
|
|
+ while (this.flushCounter == flushCounter) {
|
|
+ interrupted |= Thread.interrupted();
|
|
+ LockSupport.park();
|
|
+ }
|
|
+
|
|
+ if (interrupted) {
|
|
+ Thread.currentThread().interrupt();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Closes this queue executor's queue and optionally waits for it to empty.
|
|
+ * <p>
|
|
+ * If wait is {@code true}, then the queue will be empty by the time this call completes.
|
|
+ * </p>
|
|
+ * <p>
|
|
+ * This function is MT-Safe.
|
|
+ * </p>
|
|
+ * @param wait If this call is to wait until the queue is empty
|
|
+ * @return whether this thread shut down the queue
|
|
+ */
|
|
+ public boolean close(final boolean wait) {
|
|
+ boolean ret = this.queue.shutdown();
|
|
+ this.closed = true;
|
|
+ if (wait) {
|
|
+ this.flush();
|
|
+ }
|
|
+ return ret;
|
|
+ }
|
|
+}
|
|
diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java
|
|
new file mode 100644
|
|
index 0000000000..566d1684a5
|
|
--- /dev/null
|
|
+++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTask.java
|
|
@@ -0,0 +1,104 @@
|
|
+package com.destroystokyo.paper.io.chunk;
|
|
+
|
|
+import com.destroystokyo.paper.io.ConcreteFileIOThread;
|
|
+import com.destroystokyo.paper.io.IOUtil;
|
|
+import com.destroystokyo.paper.io.PrioritizedTaskQueue;
|
|
+import net.minecraft.server.ChunkCoordIntPair;
|
|
+import net.minecraft.server.ChunkRegionLoader;
|
|
+import net.minecraft.server.PlayerChunkMap;
|
|
+import net.minecraft.server.WorldServer;
|
|
+
|
|
+import java.io.IOException;
|
|
+import java.util.ArrayDeque;
|
|
+import java.util.function.Consumer;
|
|
+
|
|
+public class ChunkLoadTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable {
|
|
+
|
|
+ public final WorldServer world;
|
|
+ public final int chunkX;
|
|
+ public final int chunkZ;
|
|
+ public final ChunkLoadTaskManager taskManager;
|
|
+
|
|
+ final Consumer<ChunkRegionLoader.InProgressChunkHolder> onComplete;
|
|
+
|
|
+ public ConcreteFileIOThread.ChunkData chunkData;
|
|
+
|
|
+ public ChunkLoadTask(final WorldServer world, final int chunkX, final int chunkZ, final int priority,
|
|
+ final Consumer<ChunkRegionLoader.InProgressChunkHolder> onComplete,
|
|
+ final ChunkLoadTaskManager taskManager) {
|
|
+ super(priority);
|
|
+ this.world = world;
|
|
+ this.chunkX = chunkX;
|
|
+ this.chunkZ = chunkZ;
|
|
+ this.taskManager = taskManager;
|
|
+ this.onComplete = onComplete;
|
|
+ }
|
|
+
|
|
+ private static final ArrayDeque<Runnable> EMPTY_QUEUE = new ArrayDeque<>();
|
|
+
|
|
+ private static ChunkRegionLoader.InProgressChunkHolder createEmptyHolder() {
|
|
+ return new ChunkRegionLoader.InProgressChunkHolder(null, EMPTY_QUEUE);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void run() {
|
|
+ // either executed synchronously or asynchronously
|
|
+ final ConcreteFileIOThread.ChunkData chunkData = this.chunkData;
|
|
+
|
|
+ if (chunkData.poiData == ConcreteFileIOThread.FAILURE_VALUE || chunkData.chunkData == ConcreteFileIOThread.FAILURE_VALUE) {
|
|
+ ConcreteFileIOThread.LOGGER.error("Could not load chunk at (" + this.chunkX + "," + this.chunkZ + ")");
|
|
+ this.complete(ChunkLoadTask.createEmptyHolder());
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ if (chunkData.chunkData == null) {
|
|
+ ChunkRegionLoader.InProgressChunkHolder ret = new ChunkRegionLoader.InProgressChunkHolder(null, EMPTY_QUEUE);
|
|
+ ret.poiData = chunkData.poiData;
|
|
+ this.complete(ret);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ final ChunkCoordIntPair chunkPos = new ChunkCoordIntPair(this.chunkX, this.chunkZ);
|
|
+
|
|
+ final PlayerChunkMap chunkManager = this.world.getChunkProvider().playerChunkMap;
|
|
+
|
|
+ final ChunkRegionLoader.InProgressChunkHolder chunkHolder = ChunkRegionLoader.loadChunk(this.world,
|
|
+ chunkManager.definedStructureManager, chunkManager.getVillagePlace(), chunkPos,
|
|
+ chunkData.chunkData, true);
|
|
+
|
|
+ // apply fixes
|
|
+
|
|
+ try {
|
|
+ chunkData.chunkData = chunkManager.getChunkData(this.world.getWorldProvider().getDimensionManager(),
|
|
+ chunkManager.getWorldPersistentDataSupplier(), chunkData.chunkData, chunkPos, this.world);
|
|
+ } catch (IOException ex) {
|
|
+ ConcreteFileIOThread.LOGGER.error("Could not apply datafixers for chunk at (" + this.chunkX + "," + this.chunkZ + ")", ex);
|
|
+ this.complete(ChunkLoadTask.createEmptyHolder());
|
|
+ }
|
|
+
|
|
+ this.complete(chunkHolder);
|
|
+ }
|
|
+
|
|
+ private void complete(final ChunkRegionLoader.InProgressChunkHolder holder) {
|
|
+ this.onComplete.accept(holder);
|
|
+ this.taskManager.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(this.chunkX, this.chunkZ)), (final Long keyInMap, final ChunkLoadTask valueInMap) -> {
|
|
+ if (valueInMap != ChunkLoadTask.this) {
|
|
+ throw new IllegalStateException("Expected this task to be scheduled, but another was! Other:" + valueInMap);
|
|
+ }
|
|
+ return null;
|
|
+ });
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean raisePriority(final int priority) {
|
|
+ ConcreteFileIOThread.Holder.INSTANCE.bumpPriority(this.world, this.chunkX, this.chunkZ, priority);
|
|
+ return super.raisePriority(priority);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean updatePriority(final int priority) {
|
|
+ ConcreteFileIOThread.Holder.INSTANCE.setPriority(this.world, this.chunkX, this.chunkZ, priority);
|
|
+ return super.updatePriority(priority);
|
|
+ }
|
|
+
|
|
+}
|
|
diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTaskManager.java
|
|
new file mode 100644
|
|
index 0000000000..8dbaaba3cd
|
|
--- /dev/null
|
|
+++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkLoadTaskManager.java
|
|
@@ -0,0 +1,119 @@
|
|
+package com.destroystokyo.paper.io.chunk;
|
|
+
|
|
+import com.destroystokyo.paper.io.ConcreteFileIOThread;
|
|
+import com.destroystokyo.paper.io.IOUtil;
|
|
+import com.destroystokyo.paper.io.PrioritizedTaskQueue;
|
|
+import com.destroystokyo.paper.io.QueueExecutorThread;
|
|
+import net.minecraft.server.ChunkRegionLoader;
|
|
+import net.minecraft.server.IAsyncTaskHandler;
|
|
+import net.minecraft.server.WorldServer;
|
|
+import org.spigotmc.AsyncCatcher;
|
|
+
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
+import java.util.function.Consumer;
|
|
+
|
|
+public final class ChunkLoadTaskManager {
|
|
+
|
|
+ private final QueueExecutorThread<ChunkLoadTask>[] workers;
|
|
+ private final WorldServer world;
|
|
+
|
|
+ private final PrioritizedTaskQueue<ChunkLoadTask> queue = new PrioritizedTaskQueue<>();
|
|
+
|
|
+ final ConcurrentHashMap<Long, ChunkLoadTask> tasks = new ConcurrentHashMap<>(64, 0.5f);
|
|
+
|
|
+ public ChunkLoadTaskManager(final WorldServer world, final int threads) {
|
|
+ this.world = world;
|
|
+ this.workers = threads <= 0 ? null : new QueueExecutorThread[threads];
|
|
+
|
|
+ for (int i = 0; i < threads; ++i) {
|
|
+ this.workers[i] = new QueueExecutorThread<>(this.queue, (long)0.10e6); //0.1ms
|
|
+ this.workers[i].setName("Async chunk loader thread for world: " + world.getWorldData().getName());
|
|
+ this.workers[i].setPriority(Thread.NORM_PRIORITY - 1);
|
|
+ this.workers[i].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
|
|
+ ConcreteFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable);
|
|
+ });
|
|
+ this.workers[i].start();
|
|
+ }
|
|
+
|
|
+ }
|
|
+
|
|
+ public ChunkLoadTask scheduleChunkLoad(final int chunkX, final int chunkZ, final int priority,
|
|
+ final Consumer<ChunkRegionLoader.InProgressChunkHolder> onComplete,
|
|
+ final boolean intendingToBlock) {
|
|
+ AsyncCatcher.catchOp("Async chunk load schedule");
|
|
+ final WorldServer world = this.world;
|
|
+
|
|
+ return this.tasks.compute(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)), (final Long keyInMap, final ChunkLoadTask valueInMap) -> {
|
|
+ if (valueInMap != null) {
|
|
+ throw new IllegalStateException("Double scheduling chunk load");
|
|
+ }
|
|
+
|
|
+ final ChunkLoadTask ret = new ChunkLoadTask(world, chunkX, chunkZ, priority, onComplete, ChunkLoadTaskManager.this);
|
|
+
|
|
+ ConcreteFileIOThread.Holder.INSTANCE.loadChunkDataAsync(world, chunkX, chunkZ, priority, (final ConcreteFileIOThread.ChunkData chunkData) -> {
|
|
+ ret.chunkData = chunkData;
|
|
+ ChunkLoadTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here
|
|
+ }, true, true, intendingToBlock);
|
|
+
|
|
+ return ret;
|
|
+ });
|
|
+ }
|
|
+
|
|
+ public void flush() {
|
|
+ ConcreteFileIOThread.Holder.INSTANCE.flush();
|
|
+ if (this.workers == null) {
|
|
+ return;
|
|
+ }
|
|
+ for (final QueueExecutorThread<ChunkLoadTask> worker : this.workers) {
|
|
+ worker.flush();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public void shutdown(final boolean wait) {
|
|
+ if (wait) {
|
|
+ ConcreteFileIOThread.Holder.INSTANCE.flush();
|
|
+ }
|
|
+
|
|
+ if (this.workers == null) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ for (final QueueExecutorThread<ChunkLoadTask> worker : this.workers) {
|
|
+ worker.close(false);
|
|
+ }
|
|
+
|
|
+ if (wait) {
|
|
+ this.flush();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public void raisePriority(final int chunkX, final int chunkZ, final int priority) {
|
|
+ ChunkLoadTask task = this.tasks.get(Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)));
|
|
+ if (task != null) {
|
|
+ task.raisePriority(priority);
|
|
+ this.internalScheduleNotify();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ protected void internalSchedule(final ChunkLoadTask task) {
|
|
+ if (this.workers == null) {
|
|
+ ((IAsyncTaskHandler<Runnable>)this.world.getChunkProvider().serverThreadQueue).addTask(task);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ // It's important we order the task to be executed before notifying.
|
|
+ this.queue.add(task);
|
|
+ if (task.isScheduled()) {
|
|
+ this.internalScheduleNotify();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ protected void internalScheduleNotify() {
|
|
+ for (final QueueExecutorThread<ChunkLoadTask> worker : this.workers) {
|
|
+ if (worker.notifyTasks()) {
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+}
|
|
diff --git a/src/main/java/net/minecraft/server/ChunkProviderServer.java b/src/main/java/net/minecraft/server/ChunkProviderServer.java
|
|
index d76860daf8..32a469025e 100644
|
|
--- a/src/main/java/net/minecraft/server/ChunkProviderServer.java
|
|
+++ b/src/main/java/net/minecraft/server/ChunkProviderServer.java
|
|
@@ -147,11 +147,143 @@ public class ChunkProviderServer extends IChunkProvider {
|
|
return playerChunk.getAvailableChunkNow();
|
|
|
|
}
|
|
+
|
|
+ private long asyncLoadSeqCounter;
|
|
+
|
|
+ public void getChunkAtAsynchronously(int x, int z, boolean gen, java.util.function.Consumer<Chunk> onComplete) {
|
|
+ if (Thread.currentThread() != this.serverThread) {
|
|
+ this.serverThreadQueue.execute(() -> {
|
|
+ this.getChunkAtAsynchronously(x, z, gen, onComplete);
|
|
+ });
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ long k = ChunkCoordIntPair.pair(x, z);
|
|
+ ChunkCoordIntPair chunkPos = new ChunkCoordIntPair(x, z);
|
|
+
|
|
+ IChunkAccess ichunkaccess;
|
|
+
|
|
+ // try cache
|
|
+ for (int l = 0; l < 4; ++l) {
|
|
+ if (k == this.cachePos[l] && ChunkStatus.FULL == this.cacheStatus[l]) {
|
|
+ ichunkaccess = this.cacheChunk[l];
|
|
+ if (ichunkaccess != null) { // CraftBukkit - the chunk can become accessible in the meantime TODO for non-null chunks it might also make sense to check that the chunk's state hasn't changed in the meantime
|
|
+
|
|
+ // move to first in cache
|
|
+
|
|
+ for (int i1 = 3; i1 > 0; --i1) {
|
|
+ this.cachePos[i1] = this.cachePos[i1 - 1];
|
|
+ this.cacheStatus[i1] = this.cacheStatus[i1 - 1];
|
|
+ this.cacheChunk[i1] = this.cacheChunk[i1 - 1];
|
|
+ }
|
|
+
|
|
+ this.cachePos[0] = k;
|
|
+ this.cacheStatus[0] = ChunkStatus.FULL;
|
|
+ this.cacheChunk[0] = ichunkaccess;
|
|
+
|
|
+ onComplete.accept((Chunk)ichunkaccess);
|
|
+
|
|
+ return;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if (gen) {
|
|
+ this.bringToFullStatusAsync(x, z, chunkPos, onComplete);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ IChunkAccess current = this.getChunkAtImmediately(x, z); // we want to bypass ticket restrictions
|
|
+ if (current != null) {
|
|
+ if (!(current instanceof ProtoChunkExtension) && !(current instanceof net.minecraft.server.Chunk)) {
|
|
+ onComplete.accept(null); // the chunk is not gen'd
|
|
+ return;
|
|
+ }
|
|
+ // we know the chunk is at full status here (either in read-only mode or the real thing)
|
|
+ this.bringToFullStatusAsync(x, z, chunkPos, onComplete);
|
|
+ return;
|
|
+ } else {
|
|
+ // Paper start - async io
|
|
+ ChunkStatus status = world.getChunkProvider().playerChunkMap.getStatusOnDiskNoLoad(x, z); // Paper - async io - move to own method
|
|
+
|
|
+ if (status == ChunkStatus.EMPTY) {
|
|
+ // does not exist on disk
|
|
+ onComplete.accept(null);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ if (status == ChunkStatus.FULL) {
|
|
+ this.bringToFullStatusAsync(x, z, chunkPos, onComplete);
|
|
+ return;
|
|
+ } else if (status != null) {
|
|
+ onComplete.accept(null);
|
|
+ return; // not full status on disk
|
|
+ }
|
|
+ // status is null here
|
|
+ // Paper end
|
|
+
|
|
+ // at this stage we don't know what status the chunk is in
|
|
+ }
|
|
+
|
|
+ // here we don't know what status it is and we're not supposed to generate
|
|
+ // so we asynchronously load empty status
|
|
+
|
|
+ this.bringToStatusAsync(x, z, chunkPos, ChunkStatus.EMPTY, (IChunkAccess chunk) -> {
|
|
+ if (!(chunk instanceof ProtoChunkExtension) && !(chunk instanceof net.minecraft.server.Chunk)) {
|
|
+ // the chunk on disk was not a full status chunk
|
|
+ onComplete.accept(null);
|
|
+ return;
|
|
+ }
|
|
+ this.bringToFullStatusAsync(x, z, chunkPos, onComplete); // bring to full status if required
|
|
+ });
|
|
+ }
|
|
+
|
|
+ private void bringToFullStatusAsync(int x, int z, ChunkCoordIntPair chunkPos, java.util.function.Consumer<Chunk> onComplete) {
|
|
+ this.bringToStatusAsync(x, z, chunkPos, ChunkStatus.FULL, (java.util.function.Consumer)onComplete);
|
|
+ }
|
|
+
|
|
+ private void bringToStatusAsync(int x, int z, ChunkCoordIntPair chunkPos, ChunkStatus status, java.util.function.Consumer<IChunkAccess> onComplete) {
|
|
+ CompletableFuture<Either<IChunkAccess, PlayerChunk.Failure>> future = this.getChunkFutureMainThread(x, z, status, true);
|
|
+ long identifier = this.asyncLoadSeqCounter++;
|
|
+ int ticketLevel = MCUtil.getTicketLevelFor(status);
|
|
+ this.addTicketAtLevel(TicketType.ASYNC_LOAD, chunkPos, ticketLevel, identifier);
|
|
+
|
|
+ future.whenCompleteAsync((Either<IChunkAccess, PlayerChunk.Failure> either, Throwable throwable) -> {
|
|
+ // either left -> success
|
|
+ // either right -> failure
|
|
+
|
|
+ if (throwable != null) {
|
|
+ throw new RuntimeException(throwable);
|
|
+ }
|
|
+
|
|
+ this.removeTicketAtLevel(TicketType.ASYNC_LOAD, chunkPos, ticketLevel, identifier);
|
|
+ this.addTicketAtLevel(TicketType.UNKNOWN, chunkPos, ticketLevel, chunkPos); // allow unloading
|
|
+
|
|
+ Optional<PlayerChunk.Failure> failure = either.right();
|
|
+
|
|
+ if (failure.isPresent()) {
|
|
+ // failure
|
|
+ throw new IllegalStateException("Chunk failed to load: " + failure.get().toString());
|
|
+ }
|
|
+
|
|
+ onComplete.accept(either.left().get());
|
|
+
|
|
+ }, this.serverThreadQueue);
|
|
+ }
|
|
+
|
|
+ public <T> void addTicketAtLevel(TicketType<T> ticketType, ChunkCoordIntPair chunkPos, int ticketLevel, T identifier) {
|
|
+ this.chunkMapDistance.addTicketAtLevel(ticketType, chunkPos, ticketLevel, identifier);
|
|
+ }
|
|
+
|
|
+ public <T> void removeTicketAtLevel(TicketType<T> ticketType, ChunkCoordIntPair chunkPos, int ticketLevel, T identifier) {
|
|
+ this.chunkMapDistance.removeTicketAtLevel(ticketType, chunkPos, ticketLevel, identifier);
|
|
+ }
|
|
// Paper end
|
|
|
|
@Nullable
|
|
@Override
|
|
public IChunkAccess getChunkAt(int i, int j, ChunkStatus chunkstatus, boolean flag) {
|
|
+ final int x = i; final int z = j; // Paper - conflict on variable change
|
|
if (Thread.currentThread() != this.serverThread) {
|
|
return (IChunkAccess) CompletableFuture.supplyAsync(() -> {
|
|
return this.getChunkAt(i, j, chunkstatus, flag);
|
|
@@ -173,6 +305,9 @@ public class ChunkProviderServer extends IChunkProvider {
|
|
CompletableFuture<Either<IChunkAccess, PlayerChunk.Failure>> completablefuture = this.getChunkFutureMainThread(i, j, chunkstatus, flag);
|
|
|
|
if (!completablefuture.isDone()) { // Paper
|
|
+ // Paper start - async chunk io // Paper start - async chunk loading
|
|
+ this.world.asyncLoadManager.raisePriority(x, z, com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGHEST_PRIORITY);
|
|
+ // Paper end
|
|
this.world.timings.chunkAwait.startTiming(); // Paper
|
|
this.serverThreadQueue.awaitTasks(completablefuture::isDone);
|
|
this.world.timings.chunkAwait.stopTiming(); // Paper
|
|
diff --git a/src/main/java/net/minecraft/server/ChunkRegionLoader.java b/src/main/java/net/minecraft/server/ChunkRegionLoader.java
|
|
index f88e3d957f..bee1330763 100644
|
|
--- a/src/main/java/net/minecraft/server/ChunkRegionLoader.java
|
|
+++ b/src/main/java/net/minecraft/server/ChunkRegionLoader.java
|
|
@@ -6,6 +6,7 @@ import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
|
|
import it.unimi.dsi.fastutil.longs.LongSet;
|
|
import it.unimi.dsi.fastutil.shorts.ShortList;
|
|
import it.unimi.dsi.fastutil.shorts.ShortListIterator;
|
|
+import java.util.ArrayDeque; // Paper
|
|
import java.util.Arrays;
|
|
import java.util.BitSet;
|
|
import java.util.EnumSet;
|
|
@@ -22,7 +23,29 @@ public class ChunkRegionLoader {
|
|
|
|
private static final Logger LOGGER = LogManager.getLogger();
|
|
|
|
+ // Paper start
|
|
+ public static final class InProgressChunkHolder {
|
|
+
|
|
+ public final ProtoChunk protoChunk;
|
|
+ public final ArrayDeque<Runnable> tasks;
|
|
+
|
|
+ public NBTTagCompound poiData;
|
|
+
|
|
+ public InProgressChunkHolder(final ProtoChunk protoChunk, final ArrayDeque<Runnable> tasks) {
|
|
+ this.protoChunk = protoChunk;
|
|
+ this.tasks = tasks;
|
|
+ }
|
|
+ }
|
|
+
|
|
public static ProtoChunk loadChunk(WorldServer worldserver, DefinedStructureManager definedstructuremanager, VillagePlace villageplace, ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) {
|
|
+ InProgressChunkHolder holder = loadChunk(worldserver, definedstructuremanager, villageplace, chunkcoordintpair, nbttagcompound, true);
|
|
+ holder.tasks.forEach(Runnable::run);
|
|
+ return holder.protoChunk;
|
|
+ }
|
|
+
|
|
+ public static InProgressChunkHolder loadChunk(WorldServer worldserver, DefinedStructureManager definedstructuremanager, VillagePlace villageplace, ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound, boolean distinguish) {
|
|
+ ArrayDeque<Runnable> tasksToExecuteOnMain = new ArrayDeque<>();
|
|
+ // Paper end
|
|
ChunkGenerator<?> chunkgenerator = worldserver.getChunkProvider().getChunkGenerator();
|
|
WorldChunkManager worldchunkmanager = chunkgenerator.getWorldChunkManager();
|
|
NBTTagCompound nbttagcompound1 = nbttagcompound.getCompound("Level");
|
|
@@ -66,7 +89,9 @@ public class ChunkRegionLoader {
|
|
LightEngine lightengine = chunkproviderserver.getLightEngine();
|
|
|
|
if (flag) {
|
|
- lightengine.b(chunkcoordintpair, true);
|
|
+ tasksToExecuteOnMain.add(() -> { // Paper - delay this task since we're executing off-main
|
|
+ lightengine.b(chunkcoordintpair, true);
|
|
+ }); // Paper - delay this task since we're executing off-main
|
|
}
|
|
|
|
for (int k = 0; k < nbttaglist.size(); ++k) {
|
|
@@ -82,16 +107,30 @@ public class ChunkRegionLoader {
|
|
achunksection[b0] = chunksection;
|
|
}
|
|
|
|
- villageplace.a(chunkcoordintpair, chunksection);
|
|
+ tasksToExecuteOnMain.add(() -> { // Paper - delay this task since we're executing off-main
|
|
+ villageplace.a(chunkcoordintpair, chunksection);
|
|
+ }); // Paper - delay this task since we're executing off-main
|
|
}
|
|
|
|
if (flag) {
|
|
if (nbttagcompound2.hasKeyOfType("BlockLight", 7)) {
|
|
- lightengine.a(EnumSkyBlock.BLOCK, SectionPosition.a(chunkcoordintpair, b0), new NibbleArray(nbttagcompound2.getByteArray("BlockLight")));
|
|
+ // Paper start - delay this task since we're executing off-main
|
|
+ NibbleArray blockLight = new NibbleArray(nbttagcompound2.getByteArray("BlockLight"));
|
|
+ // Note: We move the block light nibble array creation here for perf & in case the compound is modified
|
|
+ tasksToExecuteOnMain.add(() -> {
|
|
+ lightengine.a(EnumSkyBlock.BLOCK, SectionPosition.a(chunkcoordintpair, b0), blockLight);
|
|
+ });
|
|
+ // Paper end
|
|
}
|
|
|
|
if (flag2 && nbttagcompound2.hasKeyOfType("SkyLight", 7)) {
|
|
- lightengine.a(EnumSkyBlock.SKY, SectionPosition.a(chunkcoordintpair, b0), new NibbleArray(nbttagcompound2.getByteArray("SkyLight")));
|
|
+ // Paper start - delay this task since we're executing off-main
|
|
+ NibbleArray skyLight = new NibbleArray(nbttagcompound2.getByteArray("SkyLight"));
|
|
+ // Note: We move the block light nibble array creation here for perf & in case the compound is modified
|
|
+ tasksToExecuteOnMain.add(() -> {
|
|
+ lightengine.a(EnumSkyBlock.SKY, SectionPosition.a(chunkcoordintpair, b0), skyLight);
|
|
+ });
|
|
+ // Paper end
|
|
}
|
|
}
|
|
}
|
|
@@ -194,7 +233,7 @@ public class ChunkRegionLoader {
|
|
}
|
|
|
|
if (chunkstatus_type == ChunkStatus.Type.LEVELCHUNK) {
|
|
- return new ProtoChunkExtension((Chunk) object);
|
|
+ return new InProgressChunkHolder(new ProtoChunkExtension((Chunk) object), tasksToExecuteOnMain); // Paper - Async chunk loading
|
|
} else {
|
|
ProtoChunk protochunk1 = (ProtoChunk) object;
|
|
|
|
@@ -233,7 +272,7 @@ public class ChunkRegionLoader {
|
|
protochunk1.a(worldgenstage_features, BitSet.valueOf(nbttagcompound5.getByteArray(s1)));
|
|
}
|
|
|
|
- return protochunk1;
|
|
+ return new InProgressChunkHolder(protochunk1, tasksToExecuteOnMain); // Paper - Async chunk loading
|
|
}
|
|
}
|
|
|
|
diff --git a/src/main/java/net/minecraft/server/ChunkStatus.java b/src/main/java/net/minecraft/server/ChunkStatus.java
|
|
index e324989b46..abb0d69d2f 100644
|
|
--- a/src/main/java/net/minecraft/server/ChunkStatus.java
|
|
+++ b/src/main/java/net/minecraft/server/ChunkStatus.java
|
|
@@ -153,6 +153,7 @@ public class ChunkStatus {
|
|
return ChunkStatus.q.size();
|
|
}
|
|
|
|
+ public static int getTicketLevelOffset(ChunkStatus status) { return ChunkStatus.a(status); } // Paper - OBFHELPER
|
|
public static int a(ChunkStatus chunkstatus) {
|
|
return ChunkStatus.r.getInt(chunkstatus.c());
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/IChunkLoader.java b/src/main/java/net/minecraft/server/IChunkLoader.java
|
|
index 3f14392e6e..cc933ec067 100644
|
|
--- a/src/main/java/net/minecraft/server/IChunkLoader.java
|
|
+++ b/src/main/java/net/minecraft/server/IChunkLoader.java
|
|
@@ -3,6 +3,10 @@ package net.minecraft.server;
|
|
import com.mojang.datafixers.DataFixer;
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
+// Paper start
|
|
+import java.util.concurrent.CompletableFuture;
|
|
+import java.util.concurrent.CompletionException;
|
|
+// Paper end
|
|
import java.util.function.Supplier;
|
|
import javax.annotation.Nullable;
|
|
|
|
@@ -10,7 +14,9 @@ public class IChunkLoader extends RegionFileCache {
|
|
|
|
protected final DataFixer b;
|
|
@Nullable
|
|
- private PersistentStructureLegacy a;
|
|
+ private volatile PersistentStructureLegacy a; // Paper - async chunk loading
|
|
+
|
|
+ private final Object persistentDataLock = new Object(); // Paper
|
|
|
|
public IChunkLoader(File file, DataFixer datafixer) {
|
|
super(file);
|
|
@@ -55,9 +61,26 @@ public class IChunkLoader extends RegionFileCache {
|
|
NBTTagCompound level = nbttagcompound.getCompound("Level");
|
|
if (level.getBoolean("TerrainPopulated") && !level.getBoolean("LightPopulated")) {
|
|
ChunkProviderServer cps = (generatoraccess == null) ? null : ((WorldServer) generatoraccess).getChunkProvider();
|
|
+ // Paper start - Async chunk loading
|
|
+ CompletableFuture<Void> future = new CompletableFuture<>();
|
|
+ MCUtil.ensureMain((Runnable)() -> {
|
|
+ try {
|
|
+ // Paper end
|
|
if (check(cps, pos.x - 1, pos.z) && check(cps, pos.x - 1, pos.z - 1) && check(cps, pos.x, pos.z - 1)) {
|
|
level.setBoolean("LightPopulated", true);
|
|
}
|
|
+ // Paper start - Async chunk loading
|
|
+ future.complete(null);
|
|
+ } catch (IOException ex) {
|
|
+ future.completeExceptionally(ex);
|
|
+ }
|
|
+ });
|
|
+ try {
|
|
+ future.join();
|
|
+ } catch (CompletionException ex) {
|
|
+ com.destroystokyo.paper.util.SneakyThrow.sneaky(ex.getCause());
|
|
+ }
|
|
+ // Paper end
|
|
}
|
|
}
|
|
// CraftBukkit end
|
|
@@ -65,11 +88,13 @@ public class IChunkLoader extends RegionFileCache {
|
|
if (i < 1493) {
|
|
nbttagcompound = GameProfileSerializer.a(this.b, DataFixTypes.CHUNK, nbttagcompound, i, 1493);
|
|
if (nbttagcompound.getCompound("Level").getBoolean("hasLegacyStructureData")) {
|
|
+ synchronized (this.persistentDataLock) { // Paper - Async chunk loading
|
|
if (this.a == null) {
|
|
this.a = PersistentStructureLegacy.a(dimensionmanager.getType(), (WorldPersistentData) supplier.get()); // CraftBukkit - getType
|
|
}
|
|
|
|
nbttagcompound = this.a.a(nbttagcompound);
|
|
+ } // Paper - Async chunk loading
|
|
}
|
|
}
|
|
|
|
@@ -89,7 +114,9 @@ public class IChunkLoader extends RegionFileCache {
|
|
public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws IOException {
|
|
super.write(chunkcoordintpair, nbttagcompound);
|
|
if (this.a != null) {
|
|
+ synchronized (this.persistentDataLock) { // Paper - Async chunk loading
|
|
this.a.a(chunkcoordintpair.pair());
|
|
+ } // Paper - Async chunk loading
|
|
}
|
|
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/MCUtil.java b/src/main/java/net/minecraft/server/MCUtil.java
|
|
index 23d1935dd5..14f8b61042 100644
|
|
--- a/src/main/java/net/minecraft/server/MCUtil.java
|
|
+++ b/src/main/java/net/minecraft/server/MCUtil.java
|
|
@@ -530,4 +530,9 @@ public final class MCUtil {
|
|
out.print(fileData);
|
|
}
|
|
}
|
|
+
|
|
+ public static int getTicketLevelFor(ChunkStatus status) {
|
|
+ // TODO make sure the constant `33` is correct on future updates. See getChunkAt(int, int, ChunkStatus, boolean)
|
|
+ return 33 + ChunkStatus.getTicketLevelOffset(status);
|
|
+ }
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java
|
|
index 0324a90ca5..430cd70cf5 100644
|
|
--- a/src/main/java/net/minecraft/server/MinecraftServer.java
|
|
+++ b/src/main/java/net/minecraft/server/MinecraftServer.java
|
|
@@ -764,6 +764,7 @@ public abstract class MinecraftServer extends IAsyncTaskHandlerReentrant<TickTas
|
|
this.getUserCache().c(false); // Paper
|
|
}
|
|
// Spigot end
|
|
+ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.close(true); // Paper
|
|
}
|
|
|
|
public String getServerIp() {
|
|
diff --git a/src/main/java/net/minecraft/server/PlayerChunkMap.java b/src/main/java/net/minecraft/server/PlayerChunkMap.java
|
|
index 89b72da828..089a20c835 100644
|
|
--- a/src/main/java/net/minecraft/server/PlayerChunkMap.java
|
|
+++ b/src/main/java/net/minecraft/server/PlayerChunkMap.java
|
|
@@ -57,7 +57,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
private final LightEngineThreaded lightEngine;
|
|
private final IAsyncTaskHandler<Runnable> executor;
|
|
public final ChunkGenerator<?> chunkGenerator;
|
|
- private final Supplier<WorldPersistentData> m;
|
|
+ private final Supplier<WorldPersistentData> m; public final Supplier<WorldPersistentData> getWorldPersistentDataSupplier() { return this.m; } // Paper - OBFHELPER
|
|
private final VillagePlace n;
|
|
public final LongSet unloadQueue;
|
|
private boolean updatingChunksModified;
|
|
@@ -67,7 +67,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
public final WorldLoadListener worldLoadListener;
|
|
public final PlayerChunkMap.a u; public final PlayerChunkMap.a getChunkMapDistanceManager() { return this.u; } // Paper - OBFHELPER // CraftBukkit - private -> public // PAIL chunkDistanceManager
|
|
private final AtomicInteger v;
|
|
- private final DefinedStructureManager definedStructureManager;
|
|
+ public final DefinedStructureManager definedStructureManager; // Paper - private -> public
|
|
private final File x;
|
|
private final PlayerMap playerMap;
|
|
public final Int2ObjectMap<PlayerChunkMap.EntityTracker> trackedEntities;
|
|
@@ -126,7 +126,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
this.lightEngine = new LightEngineThreaded(ilightaccess, this, this.world.getWorldProvider().g(), threadedmailbox2, this.q.a(threadedmailbox2, false));
|
|
this.u = new PlayerChunkMap.a(executor, iasynctaskhandler);
|
|
this.m = supplier;
|
|
- this.n = new VillagePlace(new File(this.x, "poi"), datafixer);
|
|
+ this.n = new VillagePlace(new File(this.x, "poi"), datafixer, this.world); // Paper
|
|
this.setViewDistance(i);
|
|
}
|
|
|
|
@@ -286,6 +286,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
@Override
|
|
public void close() throws IOException {
|
|
this.q.close();
|
|
+ this.world.asyncLoadManager.shutdown(true); // Paper - Required since we're closing regionfiles in the next line
|
|
this.n.close();
|
|
super.close();
|
|
}
|
|
@@ -333,7 +334,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
shouldSave = ((Chunk) ichunkaccess).lastSaved + world.paperConfig.autoSavePeriod <= world.getTime();
|
|
}
|
|
|
|
- if (shouldSave && this.saveChunk(ichunkaccess)) {
|
|
+ if (shouldSave && this.saveChunk(ichunkaccess, true)) { // Paper - async chunk io
|
|
++savedThisTick;
|
|
playerchunk.m();
|
|
}
|
|
@@ -353,11 +354,15 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
protected void unloadChunks(BooleanSupplier booleansupplier) {
|
|
GameProfilerFiller gameprofilerfiller = this.world.getMethodProfiler();
|
|
|
|
+ try (Timing ignored = this.world.timings.poiUnload.startTiming()) { // Paper
|
|
gameprofilerfiller.enter("poi");
|
|
this.n.a(booleansupplier);
|
|
+ }
|
|
gameprofilerfiller.exitEnter("chunk_unload");
|
|
if (!this.world.isSavingDisabled()) {
|
|
+ try (Timing ignored = this.world.timings.chunkUnload.startTiming()) { // Paper
|
|
this.b(booleansupplier);
|
|
+ }
|
|
}
|
|
|
|
gameprofilerfiller.exit();
|
|
@@ -411,7 +416,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
((Chunk) ichunkaccess).setLoaded(false);
|
|
}
|
|
|
|
- this.saveChunk(ichunkaccess);
|
|
+ this.saveChunk(ichunkaccess, true); // Paper - async chunk io
|
|
if (this.loadedChunks.remove(i) && ichunkaccess instanceof Chunk) {
|
|
Chunk chunk = (Chunk) ichunkaccess;
|
|
|
|
@@ -487,26 +492,30 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
}
|
|
}
|
|
|
|
+ // Paper start - Async chunk io
|
|
+ public NBTTagCompound completeChunkData(NBTTagCompound compound, ChunkCoordIntPair chunkcoordintpair) throws IOException {
|
|
+ return compound == null ? null : this.getChunkData(this.world.getWorldProvider().getDimensionManager(), this.getWorldPersistentDataSupplier(), compound, chunkcoordintpair, this.world);
|
|
+ }
|
|
+ // Paper end
|
|
+
|
|
private CompletableFuture<Either<IChunkAccess, PlayerChunk.Failure>> f(ChunkCoordIntPair chunkcoordintpair) {
|
|
- return CompletableFuture.supplyAsync(() -> {
|
|
+ // Paper start - Async chunk io
|
|
+ final java.util.function.BiFunction<ChunkRegionLoader.InProgressChunkHolder, Throwable, Either<IChunkAccess, PlayerChunk.Failure>> callable = (chunkHolder, ioThrowable) -> {
|
|
try (Timing ignored = this.world.timings.syncChunkLoadTimer.startTimingIfSync()) { // Paper
|
|
- NBTTagCompound nbttagcompound; // Paper
|
|
- try (Timing ignored2 = this.world.timings.chunkIOStage1.startTimingIfSync()) { // Paper
|
|
- nbttagcompound = this.readChunkData(chunkcoordintpair);
|
|
+ if (ioThrowable != null) {
|
|
+ com.destroystokyo.paper.io.IOUtil.rethrow(ioThrowable);
|
|
}
|
|
+ this.getVillagePlace().loadInData(chunkcoordintpair, chunkHolder.poiData);
|
|
+ chunkHolder.tasks.forEach(Runnable::run);
|
|
+ // Paper - async load completes this
|
|
+ // Paper end
|
|
|
|
- if (nbttagcompound != null) {
|
|
- boolean flag = nbttagcompound.hasKeyOfType("Level", 10) && nbttagcompound.getCompound("Level").hasKeyOfType("Status", 8);
|
|
-
|
|
- if (flag) {
|
|
- ProtoChunk protochunk = ChunkRegionLoader.loadChunk(this.world, this.definedStructureManager, this.n, chunkcoordintpair, nbttagcompound);
|
|
-
|
|
- protochunk.setLastSaved(this.world.getTime());
|
|
- return Either.left(protochunk);
|
|
- }
|
|
-
|
|
- PlayerChunkMap.LOGGER.error("Chunk file at {} is missing level data, skipping", chunkcoordintpair);
|
|
+ // Paper start - This is done async
|
|
+ if (chunkHolder.protoChunk != null) {
|
|
+ chunkHolder.protoChunk.setLastSaved(this.world.getTime());
|
|
+ return Either.left(chunkHolder.protoChunk);
|
|
}
|
|
+ // Paper end
|
|
} catch (ReportedException reportedexception) {
|
|
Throwable throwable = reportedexception.getCause();
|
|
|
|
@@ -520,7 +529,17 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
}
|
|
|
|
return Either.left(new ProtoChunk(chunkcoordintpair, ChunkConverter.a, this.world)); // Paper - Anti-Xray
|
|
- }, this.executor);
|
|
+ // Paper start - Async chunk io
|
|
+ };
|
|
+ CompletableFuture<Either<IChunkAccess, PlayerChunk.Failure>> ret = new CompletableFuture<>();
|
|
+ this.world.asyncLoadManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z,
|
|
+ com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY, (ChunkRegionLoader.InProgressChunkHolder holder) -> {
|
|
+ PlayerChunkMap.this.executor.addTask(() -> {
|
|
+ ret.complete(callable.apply(holder, null));
|
|
+ });
|
|
+ }, false);
|
|
+ return ret;
|
|
+ // Paper end
|
|
}
|
|
|
|
private CompletableFuture<Either<IChunkAccess, PlayerChunk.Failure>> b(PlayerChunk playerchunk, ChunkStatus chunkstatus) {
|
|
@@ -726,18 +745,42 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
return this.v.get();
|
|
}
|
|
|
|
+ // Paper start - async chunk io
|
|
+ private boolean writeDataAsync(ChunkCoordIntPair chunkPos, NBTTagCompound poiData, NBTTagCompound chunkData, boolean async) {
|
|
+ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave(this.world, chunkPos.x, chunkPos.z,
|
|
+ poiData, chunkData, !async ? com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGHEST_PRIORITY : com.destroystokyo.paper.io.PrioritizedTaskQueue.LOW_PRIORITY);
|
|
+
|
|
+ if (async) {
|
|
+ return true;
|
|
+ }
|
|
+ try (co.aikar.timings.Timing ignored = this.world.timings.chunkSaveIOWait.startTiming()) { // Paper
|
|
+ Boolean successPoi = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, chunkPos.x, chunkPos.z, true, true);
|
|
+ Boolean successChunk = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world, chunkPos.x, chunkPos.z, true, false);
|
|
+
|
|
+ if (successPoi == Boolean.FALSE || successChunk == Boolean.FALSE) {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ // null indicates no task existed, which means our write completed before we waited on it
|
|
+
|
|
+ return true;
|
|
+ } // Paper
|
|
+ }
|
|
+ // Paper end
|
|
+
|
|
public boolean saveChunk(IChunkAccess ichunkaccess) {
|
|
- this.n.a(ichunkaccess.getPos());
|
|
+ // Paper start - async param
|
|
+ return this.saveChunk(ichunkaccess, false);
|
|
+ }
|
|
+ public boolean saveChunk(IChunkAccess ichunkaccess, boolean async) {
|
|
+ try (co.aikar.timings.Timing ignored = this.world.timings.chunkSave.startTiming()) {
|
|
+ NBTTagCompound poiData = this.getVillagePlace().getData(ichunkaccess.getPos()); // Paper
|
|
+ //this.n.a(ichunkaccess.getPos()); // Delay
|
|
+ // Paper end
|
|
if (!ichunkaccess.isNeedsSaving()) {
|
|
return false;
|
|
} else {
|
|
- try {
|
|
- this.world.checkSession();
|
|
- } catch (ExceptionWorldConflict exceptionworldconflict) {
|
|
- PlayerChunkMap.LOGGER.error("Couldn't save chunk; already in use by another instance of Minecraft?", exceptionworldconflict);
|
|
- com.destroystokyo.paper.exception.ServerInternalException.reportInternalException(exceptionworldconflict); // Paper
|
|
- return false;
|
|
- }
|
|
+ // Paper - The save session check is performed on the IO thread
|
|
|
|
ichunkaccess.setLastSaved(this.world.getTime());
|
|
ichunkaccess.setNeedsSaving(false);
|
|
@@ -748,27 +791,33 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
NBTTagCompound nbttagcompound;
|
|
|
|
if (chunkstatus.getType() != ChunkStatus.Type.LEVELCHUNK) {
|
|
+ try (co.aikar.timings.Timing ignored1 = this.world.timings.chunkSaveOverwriteCheck.startTiming()) { // Paper
|
|
// Paper start - Optimize save by using status cache
|
|
ChunkStatus statusOnDisk = this.getChunkStatusOnDisk(chunkcoordintpair);
|
|
if (statusOnDisk != null && statusOnDisk.getType() == ChunkStatus.Type.LEVELCHUNK) {
|
|
// Paper end
|
|
+ this.writeDataAsync(ichunkaccess.getPos(), poiData, null, async); // Paper - Async chunk io
|
|
return false;
|
|
}
|
|
|
|
if (chunkstatus == ChunkStatus.EMPTY && ichunkaccess.h().values().stream().noneMatch(StructureStart::e)) {
|
|
+ this.writeDataAsync(ichunkaccess.getPos(), poiData, null, async); // Paper - Async chunk io
|
|
return false;
|
|
}
|
|
}
|
|
-
|
|
+ } // Paper
|
|
+ try (co.aikar.timings.Timing ignored1 = this.world.timings.chunkSaveDataSerialization.startTiming()) { // Paper
|
|
nbttagcompound = ChunkRegionLoader.saveChunk(this.world, ichunkaccess);
|
|
- this.write(chunkcoordintpair, nbttagcompound);
|
|
- return true;
|
|
+ } // Paper
|
|
+ return this.writeDataAsync(ichunkaccess.getPos(), poiData, nbttagcompound, async); // Paper - Async chunk io
|
|
+ //return true; // Paper
|
|
} catch (Exception exception) {
|
|
PlayerChunkMap.LOGGER.error("Failed to save chunk {},{}", chunkcoordintpair.x, chunkcoordintpair.z, exception);
|
|
com.destroystokyo.paper.exception.ServerInternalException.reportInternalException(exception); // Paper
|
|
return false;
|
|
}
|
|
}
|
|
+ } // Paper
|
|
}
|
|
|
|
protected void setViewDistance(int i) {
|
|
@@ -833,6 +882,42 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
return Iterables.unmodifiableIterable(this.visibleChunks.values());
|
|
}
|
|
|
|
+ // Paper start - Asynchronous chunk io
|
|
+ @Nullable
|
|
+ @Override
|
|
+ public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws IOException {
|
|
+ if (Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) {
|
|
+ NBTTagCompound ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE
|
|
+ .loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(),
|
|
+ false, true, true).join().chunkData;
|
|
+
|
|
+ if (ret == com.destroystokyo.paper.io.ConcreteFileIOThread.FAILURE_VALUE) {
|
|
+ throw new IOException("See logs for further detail");
|
|
+ }
|
|
+ return ret;
|
|
+ }
|
|
+ return super.read(chunkcoordintpair);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws IOException {
|
|
+ if (Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) {
|
|
+ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave(
|
|
+ this.world, chunkcoordintpair.x, chunkcoordintpair.z, null, nbttagcompound,
|
|
+ com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread());
|
|
+
|
|
+ Boolean ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world,
|
|
+ chunkcoordintpair.x, chunkcoordintpair.z, true, false);
|
|
+
|
|
+ if (ret == Boolean.FALSE) {
|
|
+ throw new IOException("See logs for further detail");
|
|
+ }
|
|
+ return;
|
|
+ }
|
|
+ super.write(chunkcoordintpair, nbttagcompound);
|
|
+ }
|
|
+ // Paper end
|
|
+
|
|
@Nullable
|
|
public NBTTagCompound readChunkData(ChunkCoordIntPair chunkcoordintpair) throws IOException { // Paper - private -> public
|
|
NBTTagCompound nbttagcompound = this.read(chunkcoordintpair);
|
|
@@ -855,12 +940,30 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
|
|
// Paper start - chunk status cache "api"
|
|
public ChunkStatus getChunkStatusOnDiskIfCached(ChunkCoordIntPair chunkPos) {
|
|
+ // Paper start - async io
|
|
+ NBTTagCompound inProgressWrite = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE
|
|
+ .getPendingWrite(this.world, chunkPos.x, chunkPos.z, false);
|
|
+
|
|
+ if (inProgressWrite != null) {
|
|
+ return ChunkRegionLoader.getStatus(inProgressWrite);
|
|
+ }
|
|
+ // Paper end
|
|
+
|
|
RegionFile regionFile = this.getRegionFileIfLoaded(chunkPos);
|
|
|
|
return regionFile == null ? null : regionFile.getStatusIfCached(chunkPos.x, chunkPos.z);
|
|
}
|
|
|
|
public ChunkStatus getChunkStatusOnDisk(ChunkCoordIntPair chunkPos) throws IOException {
|
|
+ // Paper start - async io
|
|
+ NBTTagCompound inProgressWrite = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE
|
|
+ .getPendingWrite(this.world, chunkPos.x, chunkPos.z, false);
|
|
+
|
|
+ if (inProgressWrite != null) {
|
|
+ return ChunkRegionLoader.getStatus(inProgressWrite);
|
|
+ }
|
|
+ // Paper end
|
|
+ synchronized (this) { // Paper - async io
|
|
RegionFile regionFile = this.getRegionFile(chunkPos, false);
|
|
|
|
if (!regionFile.chunkExists(chunkPos)) {
|
|
@@ -872,17 +975,49 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
if (status != null) {
|
|
return status;
|
|
}
|
|
+ // Paper start - async io
|
|
+ }
|
|
|
|
- this.readChunkData(chunkPos);
|
|
+ NBTTagCompound compound = this.readChunkData(chunkPos);
|
|
|
|
- return regionFile.getStatusIfCached(chunkPos.x, chunkPos.z);
|
|
+ return ChunkRegionLoader.getStatus(compound);
|
|
+ // Paper end
|
|
}
|
|
|
|
public void updateChunkStatusOnDisk(ChunkCoordIntPair chunkPos, @Nullable NBTTagCompound compound) throws IOException {
|
|
+ synchronized (this) { // Paper - async io
|
|
RegionFile regionFile = this.getRegionFile(chunkPos, false);
|
|
|
|
regionFile.setStatus(chunkPos.x, chunkPos.z, ChunkRegionLoader.getStatus(compound));
|
|
+ } // Paper - async io
|
|
}
|
|
+
|
|
+ // Paper start - async io
|
|
+ // this function will not load chunk data off disk to check for status
|
|
+ // ret null for unknown, empty for empty status on disk or absent from disk
|
|
+ public ChunkStatus getStatusOnDiskNoLoad(int x, int z) {
|
|
+ // Paper start - async io
|
|
+ net.minecraft.server.NBTTagCompound inProgressWrite = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE
|
|
+ .getPendingWrite(this.world, x, z, false);
|
|
+
|
|
+ if (inProgressWrite != null) {
|
|
+ return net.minecraft.server.ChunkRegionLoader.getStatus(inProgressWrite);
|
|
+ }
|
|
+ // Paper end
|
|
+ // variant of PlayerChunkMap#getChunkStatusOnDisk that does not load data off disk, but loads the region file
|
|
+ ChunkCoordIntPair chunkPos = new ChunkCoordIntPair(x, z);
|
|
+ synchronized (world.getChunkProvider().playerChunkMap) {
|
|
+ net.minecraft.server.RegionFile file;
|
|
+ try {
|
|
+ file = world.getChunkProvider().playerChunkMap.getRegionFile(chunkPos, false);
|
|
+ } catch (IOException ex) {
|
|
+ throw new RuntimeException(ex);
|
|
+ }
|
|
+
|
|
+ return !file.chunkExists(chunkPos) ? ChunkStatus.EMPTY : file.getStatusIfCached(x, z);
|
|
+ }
|
|
+ }
|
|
+ // Paper end
|
|
// Paper end
|
|
|
|
// Spigot Start
|
|
@@ -1222,6 +1357,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
|
|
}
|
|
|
|
+ public VillagePlace getVillagePlace() { return this.h(); } // Paper - OBFHELPER
|
|
protected VillagePlace h() {
|
|
return this.n;
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/RegionFile.java b/src/main/java/net/minecraft/server/RegionFile.java
|
|
index 66c8b0307f..2ee4b88f09 100644
|
|
--- a/src/main/java/net/minecraft/server/RegionFile.java
|
|
+++ b/src/main/java/net/minecraft/server/RegionFile.java
|
|
@@ -337,7 +337,7 @@ public class RegionFile implements AutoCloseable {
|
|
this.writeInt(i); // Paper - Avoid 3 io write calls
|
|
}
|
|
|
|
- public void close() throws IOException {
|
|
+ public synchronized void close() throws IOException { // Paper - synchronize
|
|
this.closed = true; // Paper
|
|
this.b.close();
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/RegionFileCache.java b/src/main/java/net/minecraft/server/RegionFileCache.java
|
|
index e0fdf5f90f..d9283b36b6 100644
|
|
--- a/src/main/java/net/minecraft/server/RegionFileCache.java
|
|
+++ b/src/main/java/net/minecraft/server/RegionFileCache.java
|
|
@@ -51,13 +51,13 @@ public abstract class RegionFileCache implements AutoCloseable {
|
|
}
|
|
|
|
// Paper start
|
|
- public RegionFile getRegionFileIfLoaded(ChunkCoordIntPair chunkcoordintpair) {
|
|
+ public synchronized RegionFile getRegionFileIfLoaded(ChunkCoordIntPair chunkcoordintpair) { // Paper - synchronize for async io
|
|
return this.cache.getAndMoveToFirst(ChunkCoordIntPair.pair(chunkcoordintpair.getRegionX(), chunkcoordintpair.getRegionZ()));
|
|
}
|
|
// Paper end
|
|
|
|
public RegionFile getRegionFile(ChunkCoordIntPair chunkcoordintpair, boolean existingOnly) throws IOException { return this.a(chunkcoordintpair, existingOnly); } // Paper - OBFHELPER
|
|
- private RegionFile a(ChunkCoordIntPair chunkcoordintpair, boolean existingOnly) throws IOException { // CraftBukkit
|
|
+ private synchronized RegionFile a(ChunkCoordIntPair chunkcoordintpair, boolean existingOnly) throws IOException { // CraftBukkit // Paper - synchronize for async io
|
|
long i = ChunkCoordIntPair.pair(chunkcoordintpair.getRegionX(), chunkcoordintpair.getRegionZ());
|
|
RegionFile regionfile = (RegionFile) this.cache.getAndMoveToFirst(i);
|
|
|
|
@@ -344,7 +344,7 @@ public abstract class RegionFileCache implements AutoCloseable {
|
|
}
|
|
|
|
// CraftBukkit start
|
|
- public boolean chunkExists(ChunkCoordIntPair pos) throws IOException {
|
|
+ public synchronized boolean chunkExists(ChunkCoordIntPair pos) throws IOException { // Paper - synchronize
|
|
copyIfNeeded(pos.x, pos.z); // Paper
|
|
RegionFile regionfile = a(pos, true);
|
|
|
|
diff --git a/src/main/java/net/minecraft/server/RegionFileSection.java b/src/main/java/net/minecraft/server/RegionFileSection.java
|
|
index a343a7b31d..7584174eb7 100644
|
|
--- a/src/main/java/net/minecraft/server/RegionFileSection.java
|
|
+++ b/src/main/java/net/minecraft/server/RegionFileSection.java
|
|
@@ -24,7 +24,7 @@ public class RegionFileSection<R extends MinecraftSerializable> extends RegionFi
|
|
|
|
private static final Logger LOGGER = LogManager.getLogger();
|
|
private final Long2ObjectMap<Optional<R>> b = new Long2ObjectOpenHashMap();
|
|
- private final LongLinkedOpenHashSet d = new LongLinkedOpenHashSet();
|
|
+ protected final LongLinkedOpenHashSet d = new LongLinkedOpenHashSet(); // Paper
|
|
private final BiFunction<Runnable, Dynamic<?>, R> e;
|
|
private final Function<Runnable, R> f;
|
|
private final DataFixer g;
|
|
@@ -39,8 +39,8 @@ public class RegionFileSection<R extends MinecraftSerializable> extends RegionFi
|
|
}
|
|
|
|
protected void a(BooleanSupplier booleansupplier) {
|
|
- while (!this.d.isEmpty() && booleansupplier.getAsBoolean()) {
|
|
- ChunkCoordIntPair chunkcoordintpair = SectionPosition.a(this.d.firstLong()).u();
|
|
+ while (!this.d.isEmpty() && booleansupplier.getAsBoolean()) { // Paper - conflict here to avoid obfhelpers
|
|
+ ChunkCoordIntPair chunkcoordintpair = SectionPosition.a(this.d.firstLong()).u(); // Paper - conflict here to avoid obfhelpers
|
|
|
|
this.d(chunkcoordintpair);
|
|
}
|
|
@@ -82,9 +82,9 @@ public class RegionFileSection<R extends MinecraftSerializable> extends RegionFi
|
|
Optional<R> optional = this.d(i);
|
|
|
|
if (optional.isPresent()) {
|
|
- return (MinecraftSerializable) optional.get();
|
|
+ return optional.get(); // Paper - decompile fix
|
|
} else {
|
|
- R r0 = (MinecraftSerializable) this.f.apply(() -> {
|
|
+ R r0 = this.f.apply(() -> { // Paper - decompile fix
|
|
this.a(i);
|
|
});
|
|
|
|
@@ -94,7 +94,12 @@ public class RegionFileSection<R extends MinecraftSerializable> extends RegionFi
|
|
}
|
|
|
|
private void b(ChunkCoordIntPair chunkcoordintpair) {
|
|
- this.a(chunkcoordintpair, DynamicOpsNBT.a, this.c(chunkcoordintpair));
|
|
+ // Paper start - load data in function
|
|
+ this.loadInData(chunkcoordintpair, this.c(chunkcoordintpair));
|
|
+ }
|
|
+ public void loadInData(ChunkCoordIntPair chunkPos, NBTTagCompound compound) {
|
|
+ this.a(chunkPos, DynamicOpsNBT.a, compound);
|
|
+ // Paper end
|
|
}
|
|
|
|
@Nullable
|
|
@@ -123,7 +128,7 @@ public class RegionFileSection<R extends MinecraftSerializable> extends RegionFi
|
|
for (int l = 0; l < 16; ++l) {
|
|
long i1 = SectionPosition.a(chunkcoordintpair, l).v();
|
|
Optional<R> optional = optionaldynamic.get(Integer.toString(l)).get().map((dynamic2) -> {
|
|
- return (MinecraftSerializable) this.e.apply(() -> {
|
|
+ return this.e.apply(() -> { // Paper - decompile fix
|
|
this.a(i1);
|
|
}, dynamic2);
|
|
});
|
|
@@ -142,7 +147,7 @@ public class RegionFileSection<R extends MinecraftSerializable> extends RegionFi
|
|
}
|
|
|
|
private void d(ChunkCoordIntPair chunkcoordintpair) {
|
|
- Dynamic<NBTBase> dynamic = this.a(chunkcoordintpair, DynamicOpsNBT.a);
|
|
+ Dynamic<NBTBase> dynamic = this.a(chunkcoordintpair, DynamicOpsNBT.a); // Paper - conflict here to avoid adding obfhelpers :)
|
|
NBTBase nbtbase = (NBTBase) dynamic.getValue();
|
|
|
|
if (nbtbase instanceof NBTTagCompound) {
|
|
@@ -157,6 +162,20 @@ public class RegionFileSection<R extends MinecraftSerializable> extends RegionFi
|
|
|
|
}
|
|
|
|
+ // Paper start - internal get data function, copied from above
|
|
+ private NBTTagCompound getDataInternal(ChunkCoordIntPair chunkcoordintpair) {
|
|
+ Dynamic<NBTBase> dynamic = this.a(chunkcoordintpair, DynamicOpsNBT.a);
|
|
+ NBTBase nbtbase = (NBTBase) dynamic.getValue();
|
|
+
|
|
+ if (nbtbase instanceof NBTTagCompound) {
|
|
+ return (NBTTagCompound)nbtbase;
|
|
+ } else {
|
|
+ RegionFileSection.LOGGER.error("Expected compound tag, got {}", nbtbase);
|
|
+ }
|
|
+ return null;
|
|
+ }
|
|
+ // Paper end
|
|
+
|
|
private <T> Dynamic<T> a(ChunkCoordIntPair chunkcoordintpair, DynamicOps<T> dynamicops) {
|
|
Map<T, T> map = Maps.newHashMap();
|
|
|
|
@@ -193,9 +212,9 @@ public class RegionFileSection<R extends MinecraftSerializable> extends RegionFi
|
|
public void a(ChunkCoordIntPair chunkcoordintpair) {
|
|
if (!this.d.isEmpty()) {
|
|
for (int i = 0; i < 16; ++i) {
|
|
- long j = SectionPosition.a(chunkcoordintpair, i).v();
|
|
+ long j = SectionPosition.a(chunkcoordintpair, i).v(); // Paper - conflict here to avoid obfhelpers
|
|
|
|
- if (this.d.contains(j)) {
|
|
+ if (this.d.contains(j)) { // Paper - conflict here to avoid obfhelpers
|
|
this.d(chunkcoordintpair);
|
|
return;
|
|
}
|
|
@@ -203,4 +222,21 @@ public class RegionFileSection<R extends MinecraftSerializable> extends RegionFi
|
|
}
|
|
|
|
}
|
|
+
|
|
+ // Paper start - get data function
|
|
+ public NBTTagCompound getData(ChunkCoordIntPair chunkcoordintpair) {
|
|
+ // Note: Copied from above
|
|
+ // This is checking if the data exists, then it builds it later in getDataInternal(ChunkCoordIntPair)
|
|
+ if (!this.d.isEmpty()) {
|
|
+ for (int i = 0; i < 16; ++i) {
|
|
+ long j = SectionPosition.a(chunkcoordintpair, i).v();
|
|
+
|
|
+ if (this.d.contains(j)) {
|
|
+ return this.getDataInternal(chunkcoordintpair);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ return null;
|
|
+ }
|
|
+ // Paper end
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/TicketType.java b/src/main/java/net/minecraft/server/TicketType.java
|
|
index 5acb0732c3..0ed2d2fbf9 100644
|
|
--- a/src/main/java/net/minecraft/server/TicketType.java
|
|
+++ b/src/main/java/net/minecraft/server/TicketType.java
|
|
@@ -22,6 +22,7 @@ public class TicketType<T> {
|
|
public static final TicketType<Unit> PLUGIN = a("plugin", (a, b) -> 0); // CraftBukkit
|
|
public static final TicketType<org.bukkit.plugin.Plugin> PLUGIN_TICKET = a("plugin_ticket", (plugin1, plugin2) -> plugin1.getClass().getName().compareTo(plugin2.getClass().getName())); // Craftbukkit
|
|
public static final TicketType<Integer> ANTIXRAY = a("antixray", Integer::compareTo); // Paper - Anti-Xray
|
|
+ public static final TicketType<Long> ASYNC_LOAD = a("async_load", Long::compareTo); // Paper
|
|
|
|
public static <T> TicketType<T> a(String s, Comparator<T> comparator) {
|
|
return new TicketType<>(s, comparator, 0L);
|
|
diff --git a/src/main/java/net/minecraft/server/VillagePlace.java b/src/main/java/net/minecraft/server/VillagePlace.java
|
|
index 7bc473e1ef..8e82326c47 100644
|
|
--- a/src/main/java/net/minecraft/server/VillagePlace.java
|
|
+++ b/src/main/java/net/minecraft/server/VillagePlace.java
|
|
@@ -20,8 +20,16 @@ public class VillagePlace extends RegionFileSection<VillagePlaceSection> {
|
|
|
|
private final VillagePlace.a a = new VillagePlace.a();
|
|
|
|
+ private final WorldServer world; // Paper
|
|
+
|
|
public VillagePlace(File file, DataFixer datafixer) {
|
|
+ // Paper start
|
|
+ this(file, datafixer, null);
|
|
+ }
|
|
+ public VillagePlace(File file, DataFixer datafixer, WorldServer world) {
|
|
+ // Paper end
|
|
super(file, VillagePlaceSection::new, VillagePlaceSection::new, datafixer, DataFixTypes.POI_CHUNK);
|
|
+ this.world = world; // Paper
|
|
}
|
|
|
|
public void a(BlockPosition blockposition, VillagePlaceType villageplacetype) {
|
|
@@ -128,7 +136,23 @@ public class VillagePlace extends RegionFileSection<VillagePlaceSection> {
|
|
|
|
@Override
|
|
public void a(BooleanSupplier booleansupplier) {
|
|
- super.a(booleansupplier);
|
|
+ // Paper start - async chunk io
|
|
+ if (this.world == null) {
|
|
+ super.a(booleansupplier);
|
|
+ } else {
|
|
+ //super.a(booleansupplier); // re-implement below
|
|
+ while (!((RegionFileSection)this).d.isEmpty() && booleansupplier.getAsBoolean()) {
|
|
+ ChunkCoordIntPair chunkcoordintpair = SectionPosition.a(((RegionFileSection)this).d.firstLong()).u();
|
|
+
|
|
+ NBTTagCompound data;
|
|
+ try (co.aikar.timings.Timing ignored1 = this.world.timings.poiSaveDataSerialization.startTiming()) {
|
|
+ data = this.getData(chunkcoordintpair);
|
|
+ }
|
|
+ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave(this.world,
|
|
+ chunkcoordintpair.x, chunkcoordintpair.z, data, null, com.destroystokyo.paper.io.PrioritizedTaskQueue.LOW_PRIORITY);
|
|
+ }
|
|
+ }
|
|
+ // Paper end
|
|
this.a.a();
|
|
}
|
|
|
|
@@ -164,7 +188,7 @@ public class VillagePlace extends RegionFileSection<VillagePlaceSection> {
|
|
}
|
|
|
|
private static boolean a(ChunkSection chunksection) {
|
|
- Stream stream = VillagePlaceType.e();
|
|
+ Stream<IBlockData> stream = VillagePlaceType.e(); // Paper - decompile fix
|
|
|
|
chunksection.getClass();
|
|
return stream.anyMatch(chunksection::a);
|
|
@@ -214,6 +238,42 @@ public class VillagePlace extends RegionFileSection<VillagePlaceSection> {
|
|
}
|
|
}
|
|
|
|
+ // Paper start - Asynchronous chunk io
|
|
+ @javax.annotation.Nullable
|
|
+ @Override
|
|
+ public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws java.io.IOException {
|
|
+ if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) {
|
|
+ NBTTagCompound ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE
|
|
+ .loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(),
|
|
+ true, false, true).join().poiData;
|
|
+
|
|
+ if (ret == com.destroystokyo.paper.io.ConcreteFileIOThread.FAILURE_VALUE) {
|
|
+ throw new java.io.IOException("See logs for further detail");
|
|
+ }
|
|
+ return ret;
|
|
+ }
|
|
+ return super.read(chunkcoordintpair);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws java.io.IOException {
|
|
+ if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE) {
|
|
+ com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.scheduleSave(
|
|
+ this.world, chunkcoordintpair.x, chunkcoordintpair.z, nbttagcompound, null,
|
|
+ com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread());
|
|
+
|
|
+ Boolean ret = com.destroystokyo.paper.io.ConcreteFileIOThread.Holder.INSTANCE.waitForIOToComplete(this.world,
|
|
+ chunkcoordintpair.x, chunkcoordintpair.z, true, true);
|
|
+
|
|
+ if (ret == Boolean.FALSE) {
|
|
+ throw new java.io.IOException("See logs for further detail");
|
|
+ }
|
|
+ return;
|
|
+ }
|
|
+ super.write(chunkcoordintpair, nbttagcompound);
|
|
+ }
|
|
+ // Paper end
|
|
+
|
|
public static enum Occupancy {
|
|
|
|
HAS_SPACE(VillagePlaceRecord::d), IS_OCCUPIED(VillagePlaceRecord::e), ANY((villageplacerecord) -> {
|
|
@@ -222,7 +282,7 @@ public class VillagePlace extends RegionFileSection<VillagePlaceSection> {
|
|
|
|
private final Predicate<? super VillagePlaceRecord> d;
|
|
|
|
- private Occupancy(Predicate predicate) {
|
|
+ private Occupancy(Predicate<? super VillagePlaceRecord> predicate) { // Paper - decompile fix
|
|
this.d = predicate;
|
|
}
|
|
|
|
diff --git a/src/main/java/net/minecraft/server/WorldServer.java b/src/main/java/net/minecraft/server/WorldServer.java
|
|
index 47005dcfdc..f7597d499f 100644
|
|
--- a/src/main/java/net/minecraft/server/WorldServer.java
|
|
+++ b/src/main/java/net/minecraft/server/WorldServer.java
|
|
@@ -75,6 +75,79 @@ public class WorldServer extends World {
|
|
return new Throwable(entity + " Added to world at " + new java.util.Date());
|
|
}
|
|
|
|
+ // Paper start - Asynchronous IO
|
|
+ public final com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController poiDataController = new com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController() {
|
|
+ @Override
|
|
+ public void writeData(int x, int z, NBTTagCompound compound) throws java.io.IOException {
|
|
+ WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().write(new ChunkCoordIntPair(x, z), compound);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public NBTTagCompound readData(int x, int z) throws java.io.IOException {
|
|
+ return WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().read(new ChunkCoordIntPair(x, z));
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public <T> T computeForRegionFile(int chunkX, int chunkZ, java.util.function.Function<RegionFile, T> function) {
|
|
+ synchronized (WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace()) {
|
|
+ RegionFile file;
|
|
+
|
|
+ try {
|
|
+ file = WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().getRegionFile(new ChunkCoordIntPair(chunkX, chunkZ), false);
|
|
+ } catch (java.io.IOException ex) {
|
|
+ throw new RuntimeException(ex);
|
|
+ }
|
|
+
|
|
+ return function.apply(file);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public <T> T computeForRegionFileIfLoaded(int chunkX, int chunkZ, java.util.function.Function<RegionFile, T> function) {
|
|
+ synchronized (WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace()) {
|
|
+ RegionFile file = WorldServer.this.getChunkProvider().playerChunkMap.getVillagePlace().getRegionFileIfLoaded(new ChunkCoordIntPair(chunkX, chunkZ));
|
|
+ return function.apply(file);
|
|
+ }
|
|
+ }
|
|
+ };
|
|
+
|
|
+ public final com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController chunkDataController = new com.destroystokyo.paper.io.ConcreteFileIOThread.ChunkDataController() {
|
|
+ @Override
|
|
+ public void writeData(int x, int z, NBTTagCompound compound) throws java.io.IOException {
|
|
+ WorldServer.this.getChunkProvider().playerChunkMap.write(new ChunkCoordIntPair(x, z), compound);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public NBTTagCompound readData(int x, int z) throws java.io.IOException {
|
|
+ return WorldServer.this.getChunkProvider().playerChunkMap.read(new ChunkCoordIntPair(x, z));
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public <T> T computeForRegionFile(int chunkX, int chunkZ, java.util.function.Function<RegionFile, T> function) {
|
|
+ synchronized (WorldServer.this.getChunkProvider().playerChunkMap) {
|
|
+ RegionFile file;
|
|
+
|
|
+ try {
|
|
+ file = WorldServer.this.getChunkProvider().playerChunkMap.getRegionFile(new ChunkCoordIntPair(chunkX, chunkZ), false);
|
|
+ } catch (java.io.IOException ex) {
|
|
+ throw new RuntimeException(ex);
|
|
+ }
|
|
+
|
|
+ return function.apply(file);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public <T> T computeForRegionFileIfLoaded(int chunkX, int chunkZ, java.util.function.Function<RegionFile, T> function) {
|
|
+ synchronized (WorldServer.this.getChunkProvider().playerChunkMap) {
|
|
+ RegionFile file = WorldServer.this.getChunkProvider().playerChunkMap.getRegionFileIfLoaded(new ChunkCoordIntPair(chunkX, chunkZ));
|
|
+ return function.apply(file);
|
|
+ }
|
|
+ }
|
|
+ };
|
|
+ public final com.destroystokyo.paper.io.chunk.ChunkLoadTaskManager asyncLoadManager;
|
|
+ // Paper end
|
|
+
|
|
// Add env and gen to constructor
|
|
public WorldServer(MinecraftServer minecraftserver, Executor executor, WorldNBTStorage worldnbtstorage, WorldData worlddata, DimensionManager dimensionmanager, GameProfilerFiller gameprofilerfiller, WorldLoadListener worldloadlistener, org.bukkit.World.Environment env, org.bukkit.generator.ChunkGenerator gen) {
|
|
super(worlddata, dimensionmanager, (world, worldprovider) -> {
|
|
@@ -119,6 +192,7 @@ public class WorldServer extends World {
|
|
|
|
this.mobSpawnerTrader = this.worldProvider.getDimensionManager().getType() == DimensionManager.OVERWORLD ? new MobSpawnerTrader(this) : null; // CraftBukkit - getType()
|
|
this.getServer().addWorld(this.getWorld()); // CraftBukkit
|
|
+ this.asyncLoadManager = new com.destroystokyo.paper.io.chunk.ChunkLoadTaskManager(this, 4); // todo CONFIGURABLE // Paper
|
|
}
|
|
|
|
public void doTick(BooleanSupplier booleansupplier) {
|
|
diff --git a/src/main/java/org/bukkit/craftbukkit/CraftWorld.java b/src/main/java/org/bukkit/craftbukkit/CraftWorld.java
|
|
index c5321c5076..d3d43f3b77 100644
|
|
--- a/src/main/java/org/bukkit/craftbukkit/CraftWorld.java
|
|
+++ b/src/main/java/org/bukkit/craftbukkit/CraftWorld.java
|
|
@@ -531,22 +531,23 @@ public class CraftWorld implements World {
|
|
}
|
|
|
|
if (!generate) {
|
|
- net.minecraft.server.RegionFile file;
|
|
- try {
|
|
- file = world.getChunkProvider().playerChunkMap.getRegionFile(chunkPos, false);
|
|
- } catch (IOException ex) {
|
|
- throw new RuntimeException(ex);
|
|
- }
|
|
+ ChunkStatus status = world.getChunkProvider().playerChunkMap.getStatusOnDiskNoLoad(x, z); // Paper - async io - move to own method
|
|
|
|
- ChunkStatus status = file.getStatusIfCached(x, z);
|
|
- if (!file.chunkExists(chunkPos) || (status != null && status != ChunkStatus.FULL)) {
|
|
+ // Paper start - async io
|
|
+ if (status == ChunkStatus.EMPTY) {
|
|
+ // does not exist on disk
|
|
return false;
|
|
}
|
|
|
|
+ if (status == null) { // at this stage we don't know what it is on disk
|
|
IChunkAccess chunk = world.getChunkProvider().getChunkAt(x, z, ChunkStatus.EMPTY, true);
|
|
if (!(chunk instanceof ProtoChunkExtension) && !(chunk instanceof net.minecraft.server.Chunk)) {
|
|
return false;
|
|
}
|
|
+ } else if (status != ChunkStatus.FULL) {
|
|
+ return false; // not full status on disk
|
|
+ }
|
|
+ // Paper end
|
|
|
|
// fall through to load
|
|
// we do this so we do not re-read the chunk data on disk
|
|
@@ -2323,16 +2324,17 @@ public class CraftWorld implements World {
|
|
|
|
@Override
|
|
public CompletableFuture<Chunk> getChunkAtAsync(int x, int z, boolean gen) {
|
|
- // TODO placeholder
|
|
- if (Bukkit.isPrimaryThread()) {
|
|
- return CompletableFuture.completedFuture(getChunkAtGen(x, z, gen));
|
|
- } else {
|
|
- CompletableFuture<Chunk> ret = new CompletableFuture<>();
|
|
- net.minecraft.server.MinecraftServer.getServer().scheduleOnMain(() -> {
|
|
- ret.complete(getChunkAtGen(x, z, gen));
|
|
- });
|
|
- return ret;
|
|
+ net.minecraft.server.Chunk immediate = this.world.getChunkProvider().getChunkAtIfLoadedImmediately(x, z);
|
|
+ if (immediate != null) {
|
|
+ return CompletableFuture.completedFuture(immediate.bukkitChunk);
|
|
}
|
|
+
|
|
+ CompletableFuture<Chunk> ret = new CompletableFuture<>();
|
|
+ this.world.getChunkProvider().getChunkAtAsynchronously(x, z, gen, (net.minecraft.server.Chunk chunk) -> {
|
|
+ ret.complete(chunk == null ? null : chunk.bukkitChunk);
|
|
+ });
|
|
+
|
|
+ return ret;
|
|
}
|
|
// Paper end
|
|
|
|
--
|
|
2.20.1
|
|
|