From d1030c30967faa3b26ebc46551a56c767e90d3aa Mon Sep 17 00:00:00 2001 From: Andrew Steinborn Date: Sun, 14 May 2023 04:32:58 -0400 Subject: [PATCH] Reintroduce sync event execution to the Velocity event system This required a not-insubstantial number of bug fixes, since the sync support had bit-rotted somewhat. This PR also corrects a number of bugs. Finally. the per-plugin executor services are now used to execute all async event tasks. --- .../velocitypowered/api/event/Subscribe.java | 2 +- .../script/SetManifestImplVersionPlugin.kt | 2 +- .../velocitypowered/proxy/VelocityServer.java | 1 - .../proxy/command/VelocityCommandManager.java | 9 +- .../proxy/config/VelocityConfiguration.java | 1 - .../proxy/event/EventTypeTracker.java | 8 +- .../proxy/event/VelocityEventManager.java | 106 ++++++++----- .../proxy/command/CommandTestSuite.java | 11 -- .../proxy/event/EventTest.java | 146 +++++++++++++----- .../scheduler/VelocitySchedulerTest.java | 2 +- .../proxy/testutil/FakePluginManager.java | 29 ++-- 11 files changed, 204 insertions(+), 113 deletions(-) diff --git a/api/src/main/java/com/velocitypowered/api/event/Subscribe.java b/api/src/main/java/com/velocitypowered/api/event/Subscribe.java index bee71a3cb..9cfd57ff9 100644 --- a/api/src/main/java/com/velocitypowered/api/event/Subscribe.java +++ b/api/src/main/java/com/velocitypowered/api/event/Subscribe.java @@ -43,6 +43,6 @@ public @interface Subscribe { * * @return Requires async */ - boolean async() default true; + boolean async() default false; } diff --git a/buildSrc/src/main/kotlin/com/velocitypowered/script/SetManifestImplVersionPlugin.kt b/buildSrc/src/main/kotlin/com/velocitypowered/script/SetManifestImplVersionPlugin.kt index 5be27e608..dc0a66d51 100644 --- a/buildSrc/src/main/kotlin/com/velocitypowered/script/SetManifestImplVersionPlugin.kt +++ b/buildSrc/src/main/kotlin/com/velocitypowered/script/SetManifestImplVersionPlugin.kt @@ -45,7 +45,7 @@ class SetManifestImplVersionPlugin : Plugin { velocityHumanVersion = "${project.version} (git-$currentShortRevision)" } } else { - velocityHumanVersion = archiveVersion.get() + velocityHumanVersion = project.version.toString() } attributes["Implementation-Version"] = velocityHumanVersion } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java index 0c3ffd6bb..6d05258d7 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/VelocityServer.java @@ -521,7 +521,6 @@ public class VelocityServer implements ProxyServer, ForwardingAudience { eventManager.fire(new ProxyShutdownEvent()).join(); - timedOut = !eventManager.shutdown() || timedOut; timedOut = !scheduler.shutdown() || timedOut; if (timedOut) { diff --git a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java index c4cacaf5f..7b5176124 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/command/VelocityCommandManager.java @@ -46,6 +46,8 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -68,6 +70,7 @@ public class VelocityCommandManager implements CommandManager { private final SuggestionsProvider suggestionsProvider; private final CommandGraphInjector injector; private final Map commandMetas; + private final ExecutorService asyncExecutor; /** * Constructs a command manager. @@ -86,6 +89,7 @@ public class VelocityCommandManager implements CommandManager { this.suggestionsProvider = new SuggestionsProvider<>(this.dispatcher, this.lock.readLock()); this.injector = new CommandGraphInjector<>(this.dispatcher, this.lock.readLock()); this.commandMetas = new ConcurrentHashMap<>(); + this.asyncExecutor = ForkJoinPool.commonPool(); // TODO: remove entirely } public void setAnnounceProxyCommands(boolean announceProxyCommands) { @@ -251,7 +255,7 @@ public class VelocityCommandManager implements CommandManager { return false; } return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand())); - }, eventManager.getAsyncExecutor()); + }, asyncExecutor); } @Override @@ -260,8 +264,7 @@ public class VelocityCommandManager implements CommandManager { Preconditions.checkNotNull(source, "source"); Preconditions.checkNotNull(cmdLine, "cmdLine"); - return CompletableFuture.supplyAsync( - () -> executeImmediately0(source, cmdLine), eventManager.getAsyncExecutor()); + return CompletableFuture.supplyAsync(() -> executeImmediately0(source, cmdLine), asyncExecutor); } /** diff --git a/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java b/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java index 598d0ca05..edda3c4cf 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/config/VelocityConfiguration.java @@ -29,7 +29,6 @@ import com.velocitypowered.api.util.Favicon; import com.velocitypowered.proxy.util.AddressUtil; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URL; import java.nio.charset.StandardCharsets; diff --git a/proxy/src/main/java/com/velocitypowered/proxy/event/EventTypeTracker.java b/proxy/src/main/java/com/velocitypowered/proxy/event/EventTypeTracker.java index 94d94fa8a..d4f65c5a0 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/event/EventTypeTracker.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/event/EventTypeTracker.java @@ -37,18 +37,14 @@ class EventTypeTracker { return ImmutableSet.copyOf(friends.get(eventType)); } - final Collection> types = getEventTypes(eventType); + final Collection> types = getFriendTypes(eventType); for (Class type : types) { - if (type == eventType) { - continue; - } - friends.put(type, eventType); } return types; } - private static Collection> getEventTypes(final Class eventType) { + private static Collection> getFriendTypes(final Class eventType) { return TypeToken.of(eventType).getTypes().rawTypes().stream() .filter(type -> type != Object.class) .collect(Collectors.toList()); diff --git a/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java index 5c8542708..d22243c61 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/event/VelocityEventManager.java @@ -25,7 +25,6 @@ import com.google.common.base.VerifyException; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.reflect.TypeToken; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.velocitypowered.api.event.Continuation; import com.velocitypowered.api.event.EventHandler; import com.velocitypowered.api.event.EventManager; @@ -54,9 +53,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; @@ -88,7 +84,6 @@ public class VelocityEventManager implements EventManager { private static final Comparator handlerComparator = Comparator.comparingInt(o -> o.order); - private final ExecutorService asyncExecutor; private final PluginManager pluginManager; private final ListMultimap, HandlerRegistration> handlersByType = @@ -111,9 +106,6 @@ public class VelocityEventManager implements EventManager { */ public VelocityEventManager(final PluginManager pluginManager) { this.pluginManager = pluginManager; - this.asyncExecutor = Executors - .newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder() - .setNameFormat("Velocity Async Event Executor - #%d").setDaemon(true).build()); } /** @@ -139,6 +131,7 @@ public class VelocityEventManager implements EventManager { final short order; final Class eventType; final EventHandler handler; + final AsyncType asyncType; /** * The instance of the {@link EventHandler} or the listener instance that was registered. @@ -146,31 +139,40 @@ public class VelocityEventManager implements EventManager { final Object instance; public HandlerRegistration(final PluginContainer plugin, final short order, - final Class eventType, final Object instance, final EventHandler handler) { + final Class eventType, final Object instance, final EventHandler handler, + final AsyncType asyncType) { this.plugin = plugin; this.order = order; this.eventType = eventType; this.instance = instance; this.handler = handler; + this.asyncType = asyncType; } } enum AsyncType { - /** - * The complete event will be handled on an async thread. - */ - ALWAYS, /** * The event will never run async, everything is handled on the netty thread. */ - NEVER + NEVER, + /** + * The event will initially start on the thread calling the {@code fire} method, and possibly + * switch over to an async thread. + */ + SOMETIMES, + /** + * The complete event will be handled on an async thread. + */ + ALWAYS } static final class HandlersCache { + final AsyncType asyncType; final HandlerRegistration[] handlers; - HandlersCache(final HandlerRegistration[] handlers) { + HandlersCache(AsyncType asyncType, final HandlerRegistration[] handlers) { + this.asyncType = asyncType; this.handlers = handlers; } } @@ -193,7 +195,15 @@ public class VelocityEventManager implements EventManager { } baked.sort(handlerComparator); - return new HandlersCache(baked.toArray(new HandlerRegistration[0])); + + AsyncType asyncType = AsyncType.NEVER; + for (HandlerRegistration registration : baked) { + if (registration.asyncType.compareTo(asyncType) > 0) { + asyncType = registration.asyncType; + } + } + + return new HandlersCache(asyncType, baked.toArray(new HandlerRegistration[0])); } /** @@ -229,15 +239,17 @@ public class VelocityEventManager implements EventManager { static final class MethodHandlerInfo { final Method method; + final AsyncType asyncType; final @Nullable Class eventType; final short order; final @Nullable String errors; final @Nullable Class continuationType; - private MethodHandlerInfo(final Method method, final @Nullable Class eventType, - final short order, final @Nullable String errors, + private MethodHandlerInfo(final Method method, final AsyncType asyncType, + final @Nullable Class eventType, final short order, final @Nullable String errors, final @Nullable Class continuationType) { this.method = method; + this.asyncType = asyncType; this.eventType = eventType; this.order = order; this.errors = errors; @@ -301,18 +313,26 @@ public class VelocityEventManager implements EventManager { } } } + AsyncType asyncType = AsyncType.NEVER; if (handlerAdapter == null) { final Class returnType = method.getReturnType(); if (returnType != void.class && continuationType == Continuation.class) { errors.add("method return type must be void if a continuation parameter is provided"); } else if (returnType != void.class && returnType != EventTask.class) { errors.add("method return type must be void, AsyncTask, " - + "AsyncTask.Basic or AsyncTask.WithContinuation"); + + "EventTask.Basic or EventTask.WithContinuation"); + } else if (returnType == EventTask.class) { + asyncType = AsyncType.SOMETIMES; } + } else { + asyncType = AsyncType.SOMETIMES; + } + if (subscribe.async()) { + asyncType = AsyncType.ALWAYS; } final short order = (short) subscribe.order().ordinal(); final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors); - collected.put(key, new MethodHandlerInfo(method, eventType, order, errorsJoined, + collected.put(key, new MethodHandlerInfo(method, asyncType, eventType, order, errorsJoined, continuationType)); } final Class superclass = targetClass.getSuperclass(); @@ -356,7 +376,8 @@ public class VelocityEventManager implements EventManager { requireNonNull(handler, "handler"); final HandlerRegistration registration = new HandlerRegistration(pluginContainer, - (short) order.ordinal(), eventClass, handler, (EventHandler) handler); + (short) order.ordinal(), eventClass, handler, (EventHandler) handler, + AsyncType.SOMETIMES); register(Collections.singletonList(registration)); } @@ -386,7 +407,7 @@ public class VelocityEventManager implements EventManager { final EventHandler handler = untargetedHandler.buildHandler(listener); registrations.add(new HandlerRegistration(pluginContainer, info.order, - info.eventType, listener, handler)); + info.eventType, listener, handler, info.asyncType)); } register(registrations); @@ -473,10 +494,13 @@ public class VelocityEventManager implements EventManager { private void fire(final @Nullable CompletableFuture future, final E event, final HandlersCache handlersCache) { - // In Velocity 1.1.0, all events were fired asynchronously. As Velocity 3.0.0 is intended to be - // largely (albeit not 100%) compatible with 1.1.x, we also fire events async. This behavior - // will go away in Velocity Polymer. - asyncExecutor.execute(() -> fire(future, event, 0, true, handlersCache.handlers)); + final HandlerRegistration registration = handlersCache.handlers[0]; + if (registration.asyncType == AsyncType.ALWAYS) { + registration.plugin.getExecutorService().execute( + () -> fire(future, event, 0, true, handlersCache.handlers)); + } else { + fire(future, event, 0, false, handlersCache.handlers); + } } private static final int TASK_STATE_DEFAULT = 0; @@ -505,6 +529,7 @@ public class VelocityEventManager implements EventManager { private final @Nullable CompletableFuture future; private final boolean currentlyAsync; private final E event; + private final Thread firedOnThread; // This field is modified via a VarHandle, so this field is used and cannot be final. @SuppressWarnings({"UnusedVariable", "FieldMayBeFinal", "FieldCanBeLocal"}) @@ -527,6 +552,7 @@ public class VelocityEventManager implements EventManager { this.event = event; this.index = index; this.currentlyAsync = currentlyAsync; + this.firedOnThread = Thread.currentThread(); } @Override @@ -537,8 +563,8 @@ public class VelocityEventManager implements EventManager { } /** - * Executes the task and returns whether the next one should be executed immediately after this - * one without scheduling. + * Executes the task and returns whether the next handler should be executed immediately + * after this one, without additional scheduling. */ boolean execute() { state = TASK_STATE_EXECUTING; @@ -580,7 +606,18 @@ public class VelocityEventManager implements EventManager { } if (!CONTINUATION_TASK_STATE.compareAndSet( this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) { - asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations)); + // We established earlier that registrations[index + 1] is a valid index. + // If we are remaining in the same thread for the next handler, fire + // the next event immediately, else fire it within the executor service + // of the plugin with the next handler. + final HandlerRegistration next = registrations[index + 1]; + final Thread currentThread = Thread.currentThread(); + if (currentThread == firedOnThread && next.asyncType != AsyncType.ALWAYS) { + fire(future, event, index + 1, currentlyAsync, registrations); + } else { + next.plugin.getExecutorService().execute(() -> + fire(future, event, index + 1, true, registrations)); + } } } @@ -606,7 +643,7 @@ public class VelocityEventManager implements EventManager { continue; } } else { - asyncExecutor.execute(continuationTask); + registration.plugin.getExecutorService().execute(continuationTask); } // fire will continue in another thread once the async task is // executed and the continuation is resumed @@ -625,13 +662,4 @@ public class VelocityEventManager implements EventManager { logger.error("Couldn't pass {} to {}", registration.eventType.getSimpleName(), registration.plugin.getDescription().getId(), t); } - - public boolean shutdown() throws InterruptedException { - asyncExecutor.shutdown(); - return asyncExecutor.awaitTermination(10, TimeUnit.SECONDS); - } - - public ExecutorService getAsyncExecutor() { - return asyncExecutor; - } } \ No newline at end of file diff --git a/proxy/src/test/java/com/velocitypowered/proxy/command/CommandTestSuite.java b/proxy/src/test/java/com/velocitypowered/proxy/command/CommandTestSuite.java index 9bd202fa4..c8e17d0f8 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/command/CommandTestSuite.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/command/CommandTestSuite.java @@ -31,7 +31,6 @@ import com.velocitypowered.proxy.event.MockEventManager; import com.velocitypowered.proxy.event.VelocityEventManager; import java.util.Arrays; import java.util.Collection; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -47,16 +46,6 @@ abstract class CommandTestSuite { eventManager = new MockEventManager(); } - @AfterAll - static void afterAll() { - try { - eventManager.shutdown(); - eventManager = null; - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - @BeforeEach void setUp() { this.manager = new VelocityCommandManager(eventManager); diff --git a/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java index e05df200f..fae6cc45e 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/event/EventTest.java @@ -41,20 +41,24 @@ import org.junit.jupiter.api.TestInstance; public class EventTest { public static final String CONTINUATION_TEST_THREAD_NAME = "Continuation test thread"; - private final VelocityEventManager eventManager = - new VelocityEventManager(new FakePluginManager()); + private final FakePluginManager pluginManager = new FakePluginManager(); + private final VelocityEventManager eventManager = new VelocityEventManager(pluginManager); @AfterAll void shutdown() throws Exception { - eventManager.shutdown(); + pluginManager.shutdown(); } static final class TestEvent { } + static void assertSyncThread(final Thread thread) { + assertEquals(Thread.currentThread(), thread); + } + static void assertAsyncThread(final Thread thread) { - assertTrue(thread.getName().contains("Velocity Async Event Executor")); + assertTrue(thread.getName().contains("Test Async Thread")); } static void assertContinuationThread(final Thread thread) { @@ -90,6 +94,7 @@ public class EventTest { eventManager.fire(new TestEvent()).get(); } finally { eventManager.unregisterListeners(FakePluginManager.PLUGIN_A); + eventManager.unregisterListeners(FakePluginManager.PLUGIN_B); } // Check that the order is A < B < C. @@ -119,6 +124,7 @@ public class EventTest { eventManager.fire(new TestEvent()).get(); } finally { eventManager.unregisterListeners(FakePluginManager.PLUGIN_A); + eventManager.unregisterListeners(FakePluginManager.PLUGIN_B); } // Check that the order is A < B < C. @@ -126,6 +132,26 @@ public class EventTest { assertTrue(listener2Invoked.get() < listener3Invoked.get(), "Listener C invoked before B!"); } + @Test + void testAlwaysSync() throws Exception { + final AlwaysSyncListener listener = new AlwaysSyncListener(); + handleMethodListener(listener); + assertSyncThread(listener.thread); + assertEquals(1, listener.result); + } + + static final class AlwaysSyncListener { + + @MonotonicNonNull Thread thread; + int result; + + @Subscribe + void sync(TestEvent event) { + result++; + thread = Thread.currentThread(); + } + } + @Test void testAlwaysAsync() throws Exception { final AlwaysAsyncListener listener = new AlwaysAsyncListener(); @@ -143,7 +169,7 @@ public class EventTest { @MonotonicNonNull Thread threadC; int result; - @Subscribe + @Subscribe(async = true, order = PostOrder.EARLY) void firstAsync(TestEvent event) { result++; threadA = Thread.currentThread(); @@ -155,50 +181,93 @@ public class EventTest { return EventTask.async(() -> result++); } - @Subscribe + @Subscribe(order = PostOrder.LATE) void thirdAsync(TestEvent event) { result++; threadC = Thread.currentThread(); } } + @Test + void testSometimesAsync() throws Exception { + final SometimesAsyncListener listener = new SometimesAsyncListener(); + handleMethodListener(listener); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertAsyncThread(listener.threadC); + assertAsyncThread(listener.threadD); + assertEquals(3, listener.result); + } + + static final class SometimesAsyncListener { + + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; + @MonotonicNonNull Thread threadD; + int result; + + @Subscribe(order = PostOrder.EARLY) + void notAsync(TestEvent event) { + result++; + threadA = Thread.currentThread(); + } + + @Subscribe + EventTask notAsyncUntilTask(TestEvent event) { + threadB = Thread.currentThread(); + return EventTask.async(() -> { + threadC = Thread.currentThread(); + result++; + }); + } + + @Subscribe(order = PostOrder.LATE) + void stillAsyncAfterTask(TestEvent event) { + threadD = Thread.currentThread(); + result++; + } + } + @Test void testContinuation() throws Exception { final ContinuationListener listener = new ContinuationListener(); handleMethodListener(listener); - assertAsyncThread(listener.thread1); - assertAsyncThread(listener.thread2); - assertContinuationThread(listener.thread2Custom); - assertAsyncThread(listener.thread3); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertAsyncThread(listener.threadC); assertEquals(2, listener.value.get()); } static final class ContinuationListener { - @MonotonicNonNull Thread thread1; - @MonotonicNonNull Thread thread2; - @MonotonicNonNull Thread thread2Custom; - @MonotonicNonNull Thread thread3; + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; final AtomicInteger value = new AtomicInteger(); @Subscribe(order = PostOrder.EARLY) EventTask continuation(TestEvent event) { - thread1 = Thread.currentThread(); + threadA = Thread.currentThread(); return EventTask.withContinuation(continuation -> { value.incrementAndGet(); - thread2 = Thread.currentThread(); + threadB = Thread.currentThread(); new Thread(() -> { - thread2Custom = Thread.currentThread(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } value.incrementAndGet(); continuation.resume(); - }, CONTINUATION_TEST_THREAD_NAME).start(); + }).start(); }); } @Subscribe(order = PostOrder.LATE) void afterContinuation(TestEvent event) { - thread3 = Thread.currentThread(); + threadC = Thread.currentThread(); } } @@ -207,9 +276,9 @@ public class EventTest { final ResumeContinuationImmediatelyListener listener = new ResumeContinuationImmediatelyListener(); handleMethodListener(listener); - assertAsyncThread(listener.threadA); - assertAsyncThread(listener.threadB); - assertAsyncThread(listener.threadC); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertSyncThread(listener.threadC); assertEquals(2, listener.result); } @@ -241,42 +310,44 @@ public class EventTest { void testContinuationParameter() throws Exception { final ContinuationParameterListener listener = new ContinuationParameterListener(); handleMethodListener(listener); - assertAsyncThread(listener.thread1); - assertAsyncThread(listener.thread2); - assertContinuationThread(listener.thread2Custom); - assertAsyncThread(listener.thread3); + assertSyncThread(listener.threadA); + assertSyncThread(listener.threadB); + assertAsyncThread(listener.threadC); assertEquals(3, listener.result.get()); } static final class ContinuationParameterListener { - @MonotonicNonNull Thread thread1; - @MonotonicNonNull Thread thread2; - @MonotonicNonNull Thread thread2Custom; - @MonotonicNonNull Thread thread3; + @MonotonicNonNull Thread threadA; + @MonotonicNonNull Thread threadB; + @MonotonicNonNull Thread threadC; final AtomicInteger result = new AtomicInteger(); @Subscribe void resume(TestEvent event, Continuation continuation) { - thread1 = Thread.currentThread(); + threadA = Thread.currentThread(); result.incrementAndGet(); continuation.resume(); } @Subscribe(order = PostOrder.LATE) void resumeFromCustomThread(TestEvent event, Continuation continuation) { - thread2 = Thread.currentThread(); + threadB = Thread.currentThread(); new Thread(() -> { - thread2Custom = Thread.currentThread(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } result.incrementAndGet(); continuation.resume(); - }, CONTINUATION_TEST_THREAD_NAME).start(); + }).start(); } @Subscribe(order = PostOrder.LAST) void afterCustomThread(TestEvent event, Continuation continuation) { - thread3 = Thread.currentThread(); + threadC = Thread.currentThread(); result.incrementAndGet(); continuation.resume(); } @@ -328,8 +399,7 @@ public class EventTest { + "the second is the fancy continuation"); } }, - new TypeToken>() { - }, + new TypeToken>() {}, invokeFunction -> (instance, event) -> EventTask.withContinuation(continuation -> invokeFunction.accept(instance, event, new FancyContinuationImpl(continuation)) @@ -349,4 +419,4 @@ public class EventTest { continuation.resume(); } } -} \ No newline at end of file +} diff --git a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java index 70cd20b10..e31a8a7e3 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java @@ -76,12 +76,12 @@ class VelocitySchedulerTest { scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> { runningLatch.countDown(); - task.cancel(); try { endingLatch.await(); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } + task.cancel(); }).delay(50, TimeUnit.MILLISECONDS) .repeat(Duration.ofMillis(5)) .schedule(); diff --git a/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java b/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java index 04ba3867d..b1bc1af72 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/testutil/FakePluginManager.java @@ -18,6 +18,7 @@ package com.velocitypowered.proxy.testutil; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginDescription; import com.velocitypowered.api.plugin.PluginManager; @@ -25,7 +26,7 @@ import java.nio.file.Path; import java.util.Collection; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Executors; import org.checkerframework.checker.nullness.qual.NonNull; /** @@ -36,15 +37,19 @@ public class FakePluginManager implements PluginManager { public static final Object PLUGIN_A = new Object(); public static final Object PLUGIN_B = new Object(); - private static final PluginContainer PC_A = new FakePluginContainer("a", PLUGIN_A); - private static final PluginContainer PC_B = new FakePluginContainer("b", PLUGIN_B); + private final PluginContainer containerA = new FakePluginContainer("a", PLUGIN_A); + private final PluginContainer containerB = new FakePluginContainer("b", PLUGIN_B); + + private ExecutorService service = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("Test Async Thread").setDaemon(true).build() + ); @Override public @NonNull Optional fromInstance(@NonNull Object instance) { if (instance == PLUGIN_A) { - return Optional.of(PC_A); + return Optional.of(containerA); } else if (instance == PLUGIN_B) { - return Optional.of(PC_B); + return Optional.of(containerB); } else { return Optional.empty(); } @@ -54,9 +59,9 @@ public class FakePluginManager implements PluginManager { public @NonNull Optional getPlugin(@NonNull String id) { switch (id) { case "a": - return Optional.of(PC_A); + return Optional.of(containerA); case "b": - return Optional.of(PC_B); + return Optional.of(containerB); default: return Optional.empty(); } @@ -64,7 +69,7 @@ public class FakePluginManager implements PluginManager { @Override public @NonNull Collection getPlugins() { - return ImmutableList.of(PC_A, PC_B); + return ImmutableList.of(containerA, containerB); } @Override @@ -77,16 +82,18 @@ public class FakePluginManager implements PluginManager { throw new UnsupportedOperationException(); } - private static class FakePluginContainer implements PluginContainer { + public void shutdown() { + this.service.shutdownNow(); + } + + private class FakePluginContainer implements PluginContainer { private final String id; private final Object instance; - private final ExecutorService service; private FakePluginContainer(String id, Object instance) { this.id = id; this.instance = instance; - this.service = ForkJoinPool.commonPool(); } @Override