From c2fd3c07ac1f317e2700aead1a2338fe11152ce2 Mon Sep 17 00:00:00 2001 From: Wouter Gritter Date: Wed, 11 Feb 2026 22:21:03 +0100 Subject: [PATCH] Introduce SchedulerBackend to fix VelocitySchedulerTest intermittent failure (#1728) --- .../scheduler/ExecutorSchedulerBackend.java | 70 +++++ .../proxy/scheduler/SchedulerBackend.java | 56 ++++ .../proxy/scheduler/VelocityScheduler.java | 24 +- .../DeterministicSchedulerBackend.java | 257 ++++++++++++++++++ .../scheduler/VelocitySchedulerTest.java | 58 ++-- 5 files changed, 433 insertions(+), 32 deletions(-) create mode 100644 proxy/src/main/java/com/velocitypowered/proxy/scheduler/ExecutorSchedulerBackend.java create mode 100644 proxy/src/main/java/com/velocitypowered/proxy/scheduler/SchedulerBackend.java create mode 100644 proxy/src/test/java/com/velocitypowered/proxy/scheduler/DeterministicSchedulerBackend.java diff --git a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/ExecutorSchedulerBackend.java b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/ExecutorSchedulerBackend.java new file mode 100644 index 000000000..d10eab6f4 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/ExecutorSchedulerBackend.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2018-2026 Velocity Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.velocitypowered.proxy.scheduler; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A {@link SchedulerBackend} backed by a real {@link ScheduledExecutorService}. + */ +public class ExecutorSchedulerBackend implements SchedulerBackend { + + private final ScheduledExecutorService executor; + + /** + * Creates a ExecutorSchedulerBackend with a default executor. + */ + public ExecutorSchedulerBackend() { + this(Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Velocity Task Scheduler Timer") + .build() + )); + } + + /** + * Creates a ExecutorSchedulerBackend with a given executor. + * + * @param executor The executor to use. + */ + public ExecutorSchedulerBackend(ScheduledExecutorService executor) { + this.executor = checkNotNull(executor, "executor"); + } + + @Override + public ScheduledFuture schedule(Runnable task, long delay, TimeUnit unit) { + return executor.schedule(task, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return executor.scheduleAtFixedRate(task, initialDelay, period, unit); + } + + @Override + public void shutdown() { + executor.shutdown(); + } +} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/SchedulerBackend.java b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/SchedulerBackend.java new file mode 100644 index 000000000..ab251ca27 --- /dev/null +++ b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/SchedulerBackend.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2018-2026 Velocity Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.velocitypowered.proxy.scheduler; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Backend interface used by {@link VelocityScheduler} to schedule timer callbacks. + * + *

This is an internal abstraction that allows tests to replace the real-time scheduler + * with a deterministic implementation. + */ +interface SchedulerBackend { + + /** + * Schedules a task to run once after the given delay. + * + * @param task the task to run + * @param delay the delay + * @param unit the delay unit + * @return a future representing the scheduled task + */ + ScheduledFuture schedule(Runnable task, long delay, TimeUnit unit); + + /** + * Schedules a task to run at a fixed rate. + * + * @param task the task to run + * @param initialDelay the initial delay + * @param period the period between runs + * @param unit the time unit + * @return a future representing the scheduled task + */ + ScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit); + + /** + * Shuts down the backend. + */ + void shutdown(); +} diff --git a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java index 727832bd4..0a38582a7 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/scheduler/VelocityScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018-2023 Velocity Contributors + * Copyright (C) 2018-2026 Velocity Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.velocitypowered.api.plugin.PluginContainer; import com.velocitypowered.api.plugin.PluginManager; import com.velocitypowered.api.scheduler.ScheduledTask; @@ -40,7 +39,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -60,20 +58,23 @@ import org.jetbrains.annotations.VisibleForTesting; public class VelocityScheduler implements Scheduler { private final PluginManager pluginManager; - private final ScheduledExecutorService timerExecutionService; + private final SchedulerBackend backend; private final Multimap tasksByPlugin = Multimaps.synchronizedMultimap( Multimaps.newSetMultimap(new IdentityHashMap<>(), HashSet::new)); /** - * Initalizes the scheduler. + * Initializes the scheduler. * * @param pluginManager the Velocity plugin manager */ public VelocityScheduler(PluginManager pluginManager) { + this(pluginManager, new ExecutorSchedulerBackend()); + } + + @VisibleForTesting + VelocityScheduler(PluginManager pluginManager, SchedulerBackend backend) { this.pluginManager = pluginManager; - this.timerExecutionService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Velocity Task Scheduler Timer").build()); + this.backend = backend; } @Override @@ -118,7 +119,7 @@ public class VelocityScheduler implements Scheduler { for (ScheduledTask task : terminating) { task.cancel(); } - timerExecutionService.shutdown(); + backend.shutdown(); final List plugins = new ArrayList<>(this.pluginManager.getPlugins()); final Iterator pluginIterator = plugins.iterator(); while (pluginIterator.hasNext()) { @@ -232,10 +233,9 @@ public class VelocityScheduler implements Scheduler { void schedule() { if (repeat == 0) { - this.future = timerExecutionService.schedule(this, delay, TimeUnit.MILLISECONDS); + this.future = backend.schedule(this, delay, TimeUnit.MILLISECONDS); } else { - this.future = timerExecutionService - .scheduleAtFixedRate(this, delay, repeat, TimeUnit.MILLISECONDS); + this.future = backend.scheduleAtFixedRate(this, delay, repeat, TimeUnit.MILLISECONDS); } } diff --git a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/DeterministicSchedulerBackend.java b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/DeterministicSchedulerBackend.java new file mode 100644 index 000000000..8a472ceb4 --- /dev/null +++ b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/DeterministicSchedulerBackend.java @@ -0,0 +1,257 @@ +/* + * Copyright (C) 2018-2026 Velocity Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.velocitypowered.proxy.scheduler; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.time.Duration; +import java.util.PriorityQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A deterministic {@link SchedulerBackend} for tests. + * + *

This backend does not use the wall clock. Tests manually advance time, and all due tasks + * are executed deterministically on the calling thread. + */ +class DeterministicSchedulerBackend implements SchedulerBackend { + + private final Object lock = new Object(); + private final PriorityQueue queue = new PriorityQueue<>(); + private boolean shutdown; + private long nowNanos; + private long seq; + + @Override + public ScheduledFuture schedule(Runnable task, long delay, TimeUnit unit) { + checkNotNull(task, "task"); + checkNotNull(unit, "unit"); + return enqueue(task, unit.toNanos(delay), 0); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + checkNotNull(task, "task"); + checkNotNull(unit, "unit"); + checkArgument(period > 0, "period must be > 0"); + return enqueue(task, unit.toNanos(initialDelay), unit.toNanos(period)); + } + + @Override + public void shutdown() { + synchronized (lock) { + shutdown = true; + queue.clear(); + } + } + + /** + * Runs all tasks that are due "now" without advancing time. + */ + void runUntilIdle() { + drainDueTasks(); + } + + /** + * Advances virtual time and runs all tasks that become due. + * + * @param duration the amount of time to advance + */ + void advance(Duration duration) { + checkNotNull(duration, "duration"); + advance(duration.toNanos()); + } + + /** + * Advances virtual time and runs all tasks that become due. + * + * @param time the time to advance + * @param unit the unit + */ + void advance(long time, TimeUnit unit) { + checkNotNull(unit, "unit"); + advance(unit.toNanos(time)); + } + + private void advance(long nanos) { + if (nanos < 0) { + throw new IllegalArgumentException("nanos must be >= 0"); + } + synchronized (lock) { + nowNanos += nanos; + } + drainDueTasks(); + } + + private ScheduledFuture enqueue(Runnable task, long delayNanos, long periodNanos) { + synchronized (lock) { + if (shutdown) { + throw new java.util.concurrent.RejectedExecutionException("backend is shut down"); + } + Entry entry = new Entry(task, nowNanos + Math.max(0, delayNanos), periodNanos, seq++); + entry.future = new FutureImpl(entry); + queue.add(entry); + return entry.future; + } + } + + private void drainDueTasks() { + while (true) { + Entry entry; + synchronized (lock) { + entry = queue.peek(); + if (entry == null || entry.nextRunNanos > nowNanos) { + return; + } + queue.poll(); + } + + // Run outside the lock to avoid deadlocks if tasks schedule more work. + if (!entry.future.isCancelled()) { + try { + entry.task.run(); + } finally { + // no-op + } + } + + synchronized (lock) { + if (entry.future.isCancelled()) { + // Cancelled tasks are not re-queued. + continue; + } + + if (entry.periodNanos == 0) { + entry.future.complete(); + } else { + // Fixed-rate semantics: next run time is based on the scheduled time, not completion time. + entry.nextRunNanos = entry.nextRunNanos + entry.periodNanos; + queue.add(entry); + } + } + } + } + + private final class FutureImpl implements ScheduledFuture { + + private final Entry entry; + private final CountDownLatch completion = new CountDownLatch(1); + private volatile boolean cancelled; + private volatile boolean done; + + private FutureImpl(Entry entry) { + this.entry = entry; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + synchronized (lock) { + if (done) { + return false; + } + cancelled = true; + done = true; + queue.remove(entry); + } + completion.countDown(); + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return done; + } + + void complete() { + if (!done) { + done = true; + completion.countDown(); + } + } + + @Override + public Object get() throws InterruptedException { + completion.await(); + if (cancelled) { + throw new CancellationException(); + } + return null; + } + + @Override + public Object get(long timeout, TimeUnit unit) throws InterruptedException, java.util.concurrent.TimeoutException { + if (!completion.await(timeout, unit)) { + throw new java.util.concurrent.TimeoutException(); + } + if (cancelled) { + throw new CancellationException(); + } + return null; + } + + @Override + public long getDelay(TimeUnit unit) { + synchronized (lock) { + long remaining = Math.max(0, entry.nextRunNanos - nowNanos); + return unit.convert(remaining, TimeUnit.NANOSECONDS); + } + } + + @Override + public int compareTo(Delayed o) { + long d1 = getDelay(TimeUnit.NANOSECONDS); + long d2 = o.getDelay(TimeUnit.NANOSECONDS); + return Long.compare(d1, d2); + } + } + + private static final class Entry implements Comparable { + + private final Runnable task; + private final long periodNanos; + private final long sequence; + private long nextRunNanos; + private FutureImpl future; + + private Entry(Runnable task, long nextRunNanos, long periodNanos, long sequence) { + this.task = task; + this.nextRunNanos = nextRunNanos; + this.periodNanos = periodNanos; + this.sequence = sequence; + } + + @Override + public int compareTo(Entry other) { + int cmp = Long.compare(this.nextRunNanos, other.nextRunNanos); + if (cmp != 0) { + return cmp; + } + return Long.compare(this.sequence, other.sequence); + } + } +} diff --git a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java index e31a8a7e3..15b8f1e9f 100644 --- a/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java +++ b/proxy/src/test/java/com/velocitypowered/proxy/scheduler/VelocitySchedulerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018-2023 Velocity Contributors + * Copyright (C) 2018-2026 Velocity Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -18,6 +18,7 @@ package com.velocitypowered.proxy.scheduler; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.velocitypowered.api.scheduler.ScheduledTask; import com.velocitypowered.api.scheduler.TaskStatus; @@ -31,22 +32,26 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; class VelocitySchedulerTest { - // TODO: The timings here will be inaccurate on slow systems. @Test void buildTask() throws Exception { - VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); + DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend(); + VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend); + CountDownLatch latch = new CountDownLatch(1); - ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown) - .schedule(); - latch.await(); + ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown).schedule(); + + backend.runUntilIdle(); // runs tasks due at t=0 + assertTrue(latch.await(5, TimeUnit.SECONDS)); + ((VelocityTask) task).awaitCompletion(); assertEquals(TaskStatus.FINISHED, task.status()); } @Test - void cancelWorks() throws Exception { - VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); + void cancelWorks() { + DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend(); + VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend); AtomicInteger i = new AtomicInteger(3); ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, i::decrementAndGet) .delay(100, TimeUnit.SECONDS) @@ -58,19 +63,26 @@ class VelocitySchedulerTest { @Test void repeatTaskWorks() throws Exception { - VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); + DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend(); + VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend); + CountDownLatch latch = new CountDownLatch(3); ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown) .delay(100, TimeUnit.MILLISECONDS) .repeat(100, TimeUnit.MILLISECONDS) .schedule(); - latch.await(); + + backend.advance(300, TimeUnit.MILLISECONDS); // triggers 3 timer firings deterministically + assertTrue(latch.await(5, TimeUnit.SECONDS)); + task.cancel(); } @Test void obtainTasksFromPlugin() throws Exception { - VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); + DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend(); + VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend); + CountDownLatch runningLatch = new CountDownLatch(1); CountDownLatch endingLatch = new CountDownLatch(1); @@ -86,16 +98,19 @@ class VelocitySchedulerTest { .repeat(Duration.ofMillis(5)) .schedule(); - runningLatch.await(); + backend.advance(50, TimeUnit.MILLISECONDS); // run first tick only (no wall clock) + assertTrue(runningLatch.await(5, TimeUnit.SECONDS)); - assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1); + assertEquals(1, scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size()); endingLatch.countDown(); } @Test void testConsumerCancel() throws Exception { - VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); + DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend(); + VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend); + CountDownLatch latch = new CountDownLatch(1); ScheduledTask task = scheduler.buildTask( @@ -108,14 +123,17 @@ class VelocitySchedulerTest { assertEquals(TaskStatus.SCHEDULED, task.status()); - latch.await(); + backend.runUntilIdle(); // initialDelay is 0 -> due immediately in virtual time + assertTrue(latch.await(5, TimeUnit.SECONDS)); assertEquals(TaskStatus.CANCELLED, task.status()); } @Test void testConsumerEquality() throws Exception { - VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager()); + DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend(); + VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend); + CountDownLatch latch = new CountDownLatch(1); AtomicReference consumerTask = new AtomicReference<>(); @@ -127,10 +145,10 @@ class VelocitySchedulerTest { }).delay(60, TimeUnit.MILLISECONDS).schedule(); initialTask.set(task); - latch.await(); + + backend.advance(60, TimeUnit.MILLISECONDS); + assertTrue(latch.await(5, TimeUnit.SECONDS)); assertEquals(consumerTask.get(), initialTask.get()); - } - -} \ No newline at end of file +}