mirror of
https://github.com/PaperMC/Velocity.git
synced 2026-04-20 03:18:17 +02:00
Reintroduce sync event execution to the Velocity event system
This required a not-insubstantial number of bug fixes, since the sync support had bit-rotted somewhat. This PR also corrects a number of bugs. Finally. the per-plugin executor services are now used to execute all async event tasks.
This commit is contained in:
@@ -540,7 +540,6 @@ public class VelocityServer implements ProxyServer, ForwardingAudience {
|
||||
|
||||
eventManager.fire(new ProxyShutdownEvent()).join();
|
||||
|
||||
timedOut = !eventManager.shutdown() || timedOut;
|
||||
timedOut = !scheduler.shutdown() || timedOut;
|
||||
|
||||
if (timedOut) {
|
||||
|
||||
@@ -49,6 +49,8 @@ import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -71,6 +73,7 @@ public class VelocityCommandManager implements CommandManager {
|
||||
private final SuggestionsProvider<CommandSource> suggestionsProvider;
|
||||
private final CommandGraphInjector<CommandSource> injector;
|
||||
private final Map<String, CommandMeta> commandMetas;
|
||||
private final ExecutorService asyncExecutor;
|
||||
|
||||
/**
|
||||
* Constructs a command manager.
|
||||
@@ -89,6 +92,7 @@ public class VelocityCommandManager implements CommandManager {
|
||||
this.suggestionsProvider = new SuggestionsProvider<>(this.dispatcher, this.lock.readLock());
|
||||
this.injector = new CommandGraphInjector<>(this.dispatcher, this.lock.readLock());
|
||||
this.commandMetas = new ConcurrentHashMap<>();
|
||||
this.asyncExecutor = ForkJoinPool.commonPool(); // TODO: remove entirely
|
||||
}
|
||||
|
||||
public void setAnnounceProxyCommands(boolean announceProxyCommands) {
|
||||
@@ -266,7 +270,7 @@ public class VelocityCommandManager implements CommandManager {
|
||||
return false;
|
||||
}
|
||||
return executeImmediately0(source, commandResult.getCommand().orElse(event.getCommand()));
|
||||
}, eventManager.getAsyncExecutor());
|
||||
}, asyncExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -275,8 +279,7 @@ public class VelocityCommandManager implements CommandManager {
|
||||
Preconditions.checkNotNull(source, "source");
|
||||
Preconditions.checkNotNull(cmdLine, "cmdLine");
|
||||
|
||||
return CompletableFuture.supplyAsync(
|
||||
() -> executeImmediately0(source, cmdLine), eventManager.getAsyncExecutor());
|
||||
return CompletableFuture.supplyAsync(() -> executeImmediately0(source, cmdLine), asyncExecutor);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -25,7 +25,6 @@ import com.google.common.base.VerifyException;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import com.google.common.reflect.TypeToken;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.velocitypowered.api.event.Continuation;
|
||||
import com.velocitypowered.api.event.EventHandler;
|
||||
import com.velocitypowered.api.event.EventManager;
|
||||
@@ -38,6 +37,7 @@ import com.velocitypowered.api.plugin.PluginManager;
|
||||
import com.velocitypowered.proxy.event.UntargetedEventHandler.EventTaskHandler;
|
||||
import com.velocitypowered.proxy.event.UntargetedEventHandler.VoidHandler;
|
||||
import com.velocitypowered.proxy.event.UntargetedEventHandler.WithContinuationHandler;
|
||||
import com.velocitypowered.proxy.util.collect.Enum2IntMap;
|
||||
import java.lang.invoke.MethodHandle;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.VarHandle;
|
||||
@@ -55,9 +55,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.BiConsumer;
|
||||
@@ -76,6 +73,14 @@ import org.lanternpowered.lmbda.LambdaType;
|
||||
*/
|
||||
public class VelocityEventManager implements EventManager {
|
||||
|
||||
private static final Enum2IntMap<PostOrder> POST_ORDER_MAP = new Enum2IntMap.Builder<>(PostOrder.class)
|
||||
.put(PostOrder.FIRST, Short.MAX_VALUE - 1)
|
||||
.put(PostOrder.EARLY, Short.MAX_VALUE / 2)
|
||||
.put(PostOrder.NORMAL, 0)
|
||||
.put(PostOrder.LATE, Short.MIN_VALUE / 2)
|
||||
.put(PostOrder.LAST, Short.MIN_VALUE + 1)
|
||||
.put(PostOrder.CUSTOM, 0)
|
||||
.build();
|
||||
private static final Logger logger = LogManager.getLogger(VelocityEventManager.class);
|
||||
|
||||
private static final MethodHandles.Lookup methodHandlesLookup = MethodHandles.lookup();
|
||||
@@ -87,9 +92,8 @@ public class VelocityEventManager implements EventManager {
|
||||
LambdaType.of(WithContinuationHandler.class);
|
||||
|
||||
private static final Comparator<HandlerRegistration> handlerComparator =
|
||||
Comparator.comparingInt(o -> o.order);
|
||||
Collections.reverseOrder(Comparator.comparingInt(o -> o.order));
|
||||
|
||||
private final ExecutorService asyncExecutor;
|
||||
private final PluginManager pluginManager;
|
||||
|
||||
private final ListMultimap<Class<?>, HandlerRegistration> handlersByType =
|
||||
@@ -112,9 +116,6 @@ public class VelocityEventManager implements EventManager {
|
||||
*/
|
||||
public VelocityEventManager(final PluginManager pluginManager) {
|
||||
this.pluginManager = pluginManager;
|
||||
this.asyncExecutor = Executors
|
||||
.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder()
|
||||
.setNameFormat("Velocity Async Event Executor - #%d").setDaemon(true).build());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -140,6 +141,7 @@ public class VelocityEventManager implements EventManager {
|
||||
final short order;
|
||||
final Class<?> eventType;
|
||||
final EventHandler<Object> handler;
|
||||
final AsyncType asyncType;
|
||||
|
||||
/**
|
||||
* The instance of the {@link EventHandler} or the listener instance that was registered.
|
||||
@@ -147,20 +149,40 @@ public class VelocityEventManager implements EventManager {
|
||||
final Object instance;
|
||||
|
||||
public HandlerRegistration(final PluginContainer plugin, final short order,
|
||||
final Class<?> eventType, final Object instance, final EventHandler<Object> handler) {
|
||||
final Class<?> eventType, final Object instance, final EventHandler<Object> handler,
|
||||
final AsyncType asyncType) {
|
||||
this.plugin = plugin;
|
||||
this.order = order;
|
||||
this.eventType = eventType;
|
||||
this.instance = instance;
|
||||
this.handler = handler;
|
||||
this.asyncType = asyncType;
|
||||
}
|
||||
}
|
||||
|
||||
enum AsyncType {
|
||||
/**
|
||||
* The event will never run async, everything is handled on the netty thread.
|
||||
*/
|
||||
NEVER,
|
||||
/**
|
||||
* The event will initially start on the thread calling the {@code fire} method, and possibly
|
||||
* switch over to an async thread.
|
||||
*/
|
||||
SOMETIMES,
|
||||
/**
|
||||
* The complete event will be handled on an async thread.
|
||||
*/
|
||||
ALWAYS
|
||||
}
|
||||
|
||||
static final class HandlersCache {
|
||||
|
||||
final AsyncType asyncType;
|
||||
final HandlerRegistration[] handlers;
|
||||
|
||||
HandlersCache(final HandlerRegistration[] handlers) {
|
||||
HandlersCache(AsyncType asyncType, final HandlerRegistration[] handlers) {
|
||||
this.asyncType = asyncType;
|
||||
this.handlers = handlers;
|
||||
}
|
||||
}
|
||||
@@ -183,7 +205,15 @@ public class VelocityEventManager implements EventManager {
|
||||
}
|
||||
|
||||
baked.sort(handlerComparator);
|
||||
return new HandlersCache(baked.toArray(new HandlerRegistration[0]));
|
||||
|
||||
AsyncType asyncType = AsyncType.NEVER;
|
||||
for (HandlerRegistration registration : baked) {
|
||||
if (registration.asyncType.compareTo(asyncType) > 0) {
|
||||
asyncType = registration.asyncType;
|
||||
}
|
||||
}
|
||||
|
||||
return new HandlersCache(asyncType, baked.toArray(new HandlerRegistration[0]));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -219,15 +249,17 @@ public class VelocityEventManager implements EventManager {
|
||||
static final class MethodHandlerInfo {
|
||||
|
||||
final Method method;
|
||||
final AsyncType asyncType;
|
||||
final @Nullable Class<?> eventType;
|
||||
final short order;
|
||||
final @Nullable String errors;
|
||||
final @Nullable Class<?> continuationType;
|
||||
|
||||
private MethodHandlerInfo(final Method method, final @Nullable Class<?> eventType,
|
||||
final short order, final @Nullable String errors,
|
||||
private MethodHandlerInfo(final Method method, final AsyncType asyncType,
|
||||
final @Nullable Class<?> eventType, final short order, final @Nullable String errors,
|
||||
final @Nullable Class<?> continuationType) {
|
||||
this.method = method;
|
||||
this.asyncType = asyncType;
|
||||
this.eventType = eventType;
|
||||
this.order = order;
|
||||
this.errors = errors;
|
||||
@@ -291,17 +323,41 @@ public class VelocityEventManager implements EventManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
AsyncType asyncType = AsyncType.NEVER;
|
||||
final Class<?> returnType = method.getReturnType();
|
||||
if (handlerAdapter == null) {
|
||||
final Class<?> returnType = method.getReturnType();
|
||||
if (returnType != void.class && continuationType == Continuation.class) {
|
||||
errors.add("method return type must be void if a continuation parameter is provided");
|
||||
} else if (returnType != void.class && returnType != EventTask.class) {
|
||||
errors.add("method return type must be void or EventTask");
|
||||
errors.add("method return type must be void, AsyncTask, "
|
||||
+ "EventTask.Basic or EventTask.WithContinuation");
|
||||
} else if (returnType == EventTask.class) {
|
||||
// technically, for compatibility, we *should* assume that the method must be invoked
|
||||
// async, however, from examining some publicly-available plugins, developers did
|
||||
// generally follow the contract and returned an EventTask only if they wanted this
|
||||
// behavior. enable it for them.
|
||||
asyncType = AsyncType.SOMETIMES;
|
||||
}
|
||||
} else {
|
||||
// for custom handlers, we always expect a return type of EventTask. this feature appears
|
||||
// to have not been used in the wild AFAIK, so it gets the new behavior by default
|
||||
asyncType = AsyncType.SOMETIMES;
|
||||
}
|
||||
|
||||
if (paramCount == 1 && returnType == void.class && subscribe.async()) {
|
||||
// these are almost always a dead giveaway of a plugin that will need its handlers
|
||||
// run async, so unless we're told otherwise, we'll assume that's the case
|
||||
asyncType = AsyncType.ALWAYS;
|
||||
}
|
||||
|
||||
final short order;
|
||||
if (subscribe.order() == PostOrder.CUSTOM) {
|
||||
order = subscribe.priority();
|
||||
} else {
|
||||
order = (short) POST_ORDER_MAP.get(subscribe.order());
|
||||
}
|
||||
final short order = (short) subscribe.order().ordinal();
|
||||
final String errorsJoined = errors.isEmpty() ? null : String.join(",", errors);
|
||||
collected.put(key, new MethodHandlerInfo(method, eventType, order, errorsJoined,
|
||||
collected.put(key, new MethodHandlerInfo(method, asyncType, eventType, order, errorsJoined,
|
||||
continuationType));
|
||||
}
|
||||
final Class<?> superclass = targetClass.getSuperclass();
|
||||
@@ -340,12 +396,29 @@ public class VelocityEventManager implements EventManager {
|
||||
@SuppressWarnings("unchecked")
|
||||
public <E> void register(final Object plugin, final Class<E> eventClass,
|
||||
final PostOrder order, final EventHandler<E> handler) {
|
||||
if (order == PostOrder.CUSTOM) {
|
||||
throw new IllegalArgumentException(
|
||||
"This method does not support custom post orders. Use the overload with short instead."
|
||||
);
|
||||
}
|
||||
register(plugin, eventClass, (short) POST_ORDER_MAP.get(order), handler, AsyncType.ALWAYS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E> void register(Object plugin, Class<E> eventClass, short postOrder,
|
||||
EventHandler<E> handler) {
|
||||
register(plugin, eventClass, postOrder, handler, AsyncType.SOMETIMES);
|
||||
}
|
||||
|
||||
private <E> void register(Object plugin, Class<E> eventClass, short postOrder,
|
||||
EventHandler<E> handler, AsyncType asyncType) {
|
||||
final PluginContainer pluginContainer = pluginManager.ensurePluginContainer(plugin);
|
||||
requireNonNull(eventClass, "eventClass");
|
||||
requireNonNull(handler, "handler");
|
||||
|
||||
final HandlerRegistration registration = new HandlerRegistration(pluginContainer,
|
||||
(short) order.ordinal(), eventClass, handler, (EventHandler<Object>) handler);
|
||||
postOrder, eventClass, handler, (EventHandler<Object>) handler,
|
||||
AsyncType.ALWAYS);
|
||||
register(Collections.singletonList(registration));
|
||||
}
|
||||
|
||||
@@ -375,7 +448,7 @@ public class VelocityEventManager implements EventManager {
|
||||
|
||||
final EventHandler<Object> handler = untargetedHandler.buildHandler(listener);
|
||||
registrations.add(new HandlerRegistration(pluginContainer, info.order,
|
||||
info.eventType, listener, handler));
|
||||
info.eventType, listener, handler, info.asyncType));
|
||||
}
|
||||
|
||||
register(registrations);
|
||||
@@ -462,10 +535,13 @@ public class VelocityEventManager implements EventManager {
|
||||
|
||||
private <E> void fire(final @Nullable CompletableFuture<E> future,
|
||||
final E event, final HandlersCache handlersCache) {
|
||||
// In Velocity 1.1.0, all events were fired asynchronously. As Velocity 3.0.0 is intended to be
|
||||
// largely (albeit not 100%) compatible with 1.1.x, we also fire events async. This behavior
|
||||
// will go away in Velocity Polymer.
|
||||
asyncExecutor.execute(() -> fire(future, event, 0, true, handlersCache.handlers));
|
||||
final HandlerRegistration registration = handlersCache.handlers[0];
|
||||
if (registration.asyncType == AsyncType.ALWAYS) {
|
||||
registration.plugin.getExecutorService().execute(
|
||||
() -> fire(future, event, 0, true, handlersCache.handlers));
|
||||
} else {
|
||||
fire(future, event, 0, false, handlersCache.handlers);
|
||||
}
|
||||
}
|
||||
|
||||
private static final int TASK_STATE_DEFAULT = 0;
|
||||
@@ -494,6 +570,7 @@ public class VelocityEventManager implements EventManager {
|
||||
private final @Nullable CompletableFuture<E> future;
|
||||
private final boolean currentlyAsync;
|
||||
private final E event;
|
||||
private final Thread firedOnThread;
|
||||
|
||||
// This field is modified via a VarHandle, so this field is used and cannot be final.
|
||||
@SuppressWarnings({"UnusedVariable", "FieldMayBeFinal", "FieldCanBeLocal"})
|
||||
@@ -516,6 +593,7 @@ public class VelocityEventManager implements EventManager {
|
||||
this.event = event;
|
||||
this.index = index;
|
||||
this.currentlyAsync = currentlyAsync;
|
||||
this.firedOnThread = Thread.currentThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -526,8 +604,8 @@ public class VelocityEventManager implements EventManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the task and returns whether the next one should be executed immediately after this
|
||||
* one without scheduling.
|
||||
* Executes the task and returns whether the next handler should be executed immediately
|
||||
* after this one, without additional scheduling.
|
||||
*/
|
||||
boolean execute() {
|
||||
state = TASK_STATE_EXECUTING;
|
||||
@@ -569,7 +647,18 @@ public class VelocityEventManager implements EventManager {
|
||||
}
|
||||
if (!CONTINUATION_TASK_STATE.compareAndSet(
|
||||
this, TASK_STATE_EXECUTING, TASK_STATE_CONTINUE_IMMEDIATELY)) {
|
||||
asyncExecutor.execute(() -> fire(future, event, index + 1, true, registrations));
|
||||
// We established earlier that registrations[index + 1] is a valid index.
|
||||
// If we are remaining in the same thread for the next handler, fire
|
||||
// the next event immediately, else fire it within the executor service
|
||||
// of the plugin with the next handler.
|
||||
final HandlerRegistration next = registrations[index + 1];
|
||||
final Thread currentThread = Thread.currentThread();
|
||||
if (currentThread == firedOnThread && next.asyncType != AsyncType.ALWAYS) {
|
||||
fire(future, event, index + 1, currentlyAsync, registrations);
|
||||
} else {
|
||||
next.plugin.getExecutorService().execute(() ->
|
||||
fire(future, event, index + 1, true, registrations));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -595,7 +684,7 @@ public class VelocityEventManager implements EventManager {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
asyncExecutor.execute(continuationTask);
|
||||
registration.plugin.getExecutorService().execute(continuationTask);
|
||||
}
|
||||
// fire will continue in another thread once the async task is
|
||||
// executed and the continuation is resumed
|
||||
@@ -615,13 +704,4 @@ public class VelocityEventManager implements EventManager {
|
||||
logger.error("Couldn't pass {} to {} {}", registration.eventType.getSimpleName(),
|
||||
pluginDescription.getId(), pluginDescription.getVersion().orElse(""), t);
|
||||
}
|
||||
|
||||
public boolean shutdown() throws InterruptedException {
|
||||
asyncExecutor.shutdown();
|
||||
return asyncExecutor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public ExecutorService getAsyncExecutor() {
|
||||
return asyncExecutor;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user