Introduce SchedulerBackend to fix VelocitySchedulerTest intermittent failure (#1728)

This commit is contained in:
Wouter Gritter
2026-02-11 22:21:03 +01:00
committed by GitHub
parent 2535751cd9
commit c2fd3c07ac
5 changed files with 433 additions and 32 deletions

View File

@@ -0,0 +1,70 @@
/*
* Copyright (C) 2018-2026 Velocity Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.velocitypowered.proxy.scheduler;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* A {@link SchedulerBackend} backed by a real {@link ScheduledExecutorService}.
*/
public class ExecutorSchedulerBackend implements SchedulerBackend {
private final ScheduledExecutorService executor;
/**
* Creates a ExecutorSchedulerBackend with a default executor.
*/
public ExecutorSchedulerBackend() {
this(Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Velocity Task Scheduler Timer")
.build()
));
}
/**
* Creates a ExecutorSchedulerBackend with a given executor.
*
* @param executor The executor to use.
*/
public ExecutorSchedulerBackend(ScheduledExecutorService executor) {
this.executor = checkNotNull(executor, "executor");
}
@Override
public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
return executor.schedule(task, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
return executor.scheduleAtFixedRate(task, initialDelay, period, unit);
}
@Override
public void shutdown() {
executor.shutdown();
}
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright (C) 2018-2026 Velocity Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.velocitypowered.proxy.scheduler;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Backend interface used by {@link VelocityScheduler} to schedule timer callbacks.
*
* <p>This is an internal abstraction that allows tests to replace the real-time scheduler
* with a deterministic implementation.
*/
interface SchedulerBackend {
/**
* Schedules a task to run once after the given delay.
*
* @param task the task to run
* @param delay the delay
* @param unit the delay unit
* @return a future representing the scheduled task
*/
ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit);
/**
* Schedules a task to run at a fixed rate.
*
* @param task the task to run
* @param initialDelay the initial delay
* @param period the period between runs
* @param unit the time unit
* @return a future representing the scheduled task
*/
ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit);
/**
* Shuts down the backend.
*/
void shutdown();
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2018-2023 Velocity Contributors
* Copyright (C) 2018-2026 Velocity Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginManager;
import com.velocitypowered.api.scheduler.ScheduledTask;
@@ -40,7 +39,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -60,20 +58,23 @@ import org.jetbrains.annotations.VisibleForTesting;
public class VelocityScheduler implements Scheduler {
private final PluginManager pluginManager;
private final ScheduledExecutorService timerExecutionService;
private final SchedulerBackend backend;
private final Multimap<Object, ScheduledTask> tasksByPlugin = Multimaps.synchronizedMultimap(
Multimaps.newSetMultimap(new IdentityHashMap<>(), HashSet::new));
/**
* Initalizes the scheduler.
* Initializes the scheduler.
*
* @param pluginManager the Velocity plugin manager
*/
public VelocityScheduler(PluginManager pluginManager) {
this(pluginManager, new ExecutorSchedulerBackend());
}
@VisibleForTesting
VelocityScheduler(PluginManager pluginManager, SchedulerBackend backend) {
this.pluginManager = pluginManager;
this.timerExecutionService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Velocity Task Scheduler Timer").build());
this.backend = backend;
}
@Override
@@ -118,7 +119,7 @@ public class VelocityScheduler implements Scheduler {
for (ScheduledTask task : terminating) {
task.cancel();
}
timerExecutionService.shutdown();
backend.shutdown();
final List<PluginContainer> plugins = new ArrayList<>(this.pluginManager.getPlugins());
final Iterator<PluginContainer> pluginIterator = plugins.iterator();
while (pluginIterator.hasNext()) {
@@ -232,10 +233,9 @@ public class VelocityScheduler implements Scheduler {
void schedule() {
if (repeat == 0) {
this.future = timerExecutionService.schedule(this, delay, TimeUnit.MILLISECONDS);
this.future = backend.schedule(this, delay, TimeUnit.MILLISECONDS);
} else {
this.future = timerExecutionService
.scheduleAtFixedRate(this, delay, repeat, TimeUnit.MILLISECONDS);
this.future = backend.scheduleAtFixedRate(this, delay, repeat, TimeUnit.MILLISECONDS);
}
}

View File

@@ -0,0 +1,257 @@
/*
* Copyright (C) 2018-2026 Velocity Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.velocitypowered.proxy.scheduler;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.time.Duration;
import java.util.PriorityQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* A deterministic {@link SchedulerBackend} for tests.
*
* <p>This backend does not use the wall clock. Tests manually advance time, and all due tasks
* are executed deterministically on the calling thread.
*/
class DeterministicSchedulerBackend implements SchedulerBackend {
private final Object lock = new Object();
private final PriorityQueue<Entry> queue = new PriorityQueue<>();
private boolean shutdown;
private long nowNanos;
private long seq;
@Override
public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
checkNotNull(task, "task");
checkNotNull(unit, "unit");
return enqueue(task, unit.toNanos(delay), 0);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
checkNotNull(task, "task");
checkNotNull(unit, "unit");
checkArgument(period > 0, "period must be > 0");
return enqueue(task, unit.toNanos(initialDelay), unit.toNanos(period));
}
@Override
public void shutdown() {
synchronized (lock) {
shutdown = true;
queue.clear();
}
}
/**
* Runs all tasks that are due "now" without advancing time.
*/
void runUntilIdle() {
drainDueTasks();
}
/**
* Advances virtual time and runs all tasks that become due.
*
* @param duration the amount of time to advance
*/
void advance(Duration duration) {
checkNotNull(duration, "duration");
advance(duration.toNanos());
}
/**
* Advances virtual time and runs all tasks that become due.
*
* @param time the time to advance
* @param unit the unit
*/
void advance(long time, TimeUnit unit) {
checkNotNull(unit, "unit");
advance(unit.toNanos(time));
}
private void advance(long nanos) {
if (nanos < 0) {
throw new IllegalArgumentException("nanos must be >= 0");
}
synchronized (lock) {
nowNanos += nanos;
}
drainDueTasks();
}
private ScheduledFuture<?> enqueue(Runnable task, long delayNanos, long periodNanos) {
synchronized (lock) {
if (shutdown) {
throw new java.util.concurrent.RejectedExecutionException("backend is shut down");
}
Entry entry = new Entry(task, nowNanos + Math.max(0, delayNanos), periodNanos, seq++);
entry.future = new FutureImpl(entry);
queue.add(entry);
return entry.future;
}
}
private void drainDueTasks() {
while (true) {
Entry entry;
synchronized (lock) {
entry = queue.peek();
if (entry == null || entry.nextRunNanos > nowNanos) {
return;
}
queue.poll();
}
// Run outside the lock to avoid deadlocks if tasks schedule more work.
if (!entry.future.isCancelled()) {
try {
entry.task.run();
} finally {
// no-op
}
}
synchronized (lock) {
if (entry.future.isCancelled()) {
// Cancelled tasks are not re-queued.
continue;
}
if (entry.periodNanos == 0) {
entry.future.complete();
} else {
// Fixed-rate semantics: next run time is based on the scheduled time, not completion time.
entry.nextRunNanos = entry.nextRunNanos + entry.periodNanos;
queue.add(entry);
}
}
}
}
private final class FutureImpl implements ScheduledFuture<Object> {
private final Entry entry;
private final CountDownLatch completion = new CountDownLatch(1);
private volatile boolean cancelled;
private volatile boolean done;
private FutureImpl(Entry entry) {
this.entry = entry;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
synchronized (lock) {
if (done) {
return false;
}
cancelled = true;
done = true;
queue.remove(entry);
}
completion.countDown();
return true;
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return done;
}
void complete() {
if (!done) {
done = true;
completion.countDown();
}
}
@Override
public Object get() throws InterruptedException {
completion.await();
if (cancelled) {
throw new CancellationException();
}
return null;
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, java.util.concurrent.TimeoutException {
if (!completion.await(timeout, unit)) {
throw new java.util.concurrent.TimeoutException();
}
if (cancelled) {
throw new CancellationException();
}
return null;
}
@Override
public long getDelay(TimeUnit unit) {
synchronized (lock) {
long remaining = Math.max(0, entry.nextRunNanos - nowNanos);
return unit.convert(remaining, TimeUnit.NANOSECONDS);
}
}
@Override
public int compareTo(Delayed o) {
long d1 = getDelay(TimeUnit.NANOSECONDS);
long d2 = o.getDelay(TimeUnit.NANOSECONDS);
return Long.compare(d1, d2);
}
}
private static final class Entry implements Comparable<Entry> {
private final Runnable task;
private final long periodNanos;
private final long sequence;
private long nextRunNanos;
private FutureImpl future;
private Entry(Runnable task, long nextRunNanos, long periodNanos, long sequence) {
this.task = task;
this.nextRunNanos = nextRunNanos;
this.periodNanos = periodNanos;
this.sequence = sequence;
}
@Override
public int compareTo(Entry other) {
int cmp = Long.compare(this.nextRunNanos, other.nextRunNanos);
if (cmp != 0) {
return cmp;
}
return Long.compare(this.sequence, other.sequence);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (C) 2018-2023 Velocity Contributors
* Copyright (C) 2018-2026 Velocity Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -18,6 +18,7 @@
package com.velocitypowered.proxy.scheduler;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.TaskStatus;
@@ -31,22 +32,26 @@ import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
class VelocitySchedulerTest {
// TODO: The timings here will be inaccurate on slow systems.
@Test
void buildTask() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend();
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend);
CountDownLatch latch = new CountDownLatch(1);
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown)
.schedule();
latch.await();
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown).schedule();
backend.runUntilIdle(); // runs tasks due at t=0
assertTrue(latch.await(5, TimeUnit.SECONDS));
((VelocityTask) task).awaitCompletion();
assertEquals(TaskStatus.FINISHED, task.status());
}
@Test
void cancelWorks() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
void cancelWorks() {
DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend();
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend);
AtomicInteger i = new AtomicInteger(3);
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, i::decrementAndGet)
.delay(100, TimeUnit.SECONDS)
@@ -58,19 +63,26 @@ class VelocitySchedulerTest {
@Test
void repeatTaskWorks() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend();
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend);
CountDownLatch latch = new CountDownLatch(3);
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, latch::countDown)
.delay(100, TimeUnit.MILLISECONDS)
.repeat(100, TimeUnit.MILLISECONDS)
.schedule();
latch.await();
backend.advance(300, TimeUnit.MILLISECONDS); // triggers 3 timer firings deterministically
assertTrue(latch.await(5, TimeUnit.SECONDS));
task.cancel();
}
@Test
void obtainTasksFromPlugin() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend();
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend);
CountDownLatch runningLatch = new CountDownLatch(1);
CountDownLatch endingLatch = new CountDownLatch(1);
@@ -86,16 +98,19 @@ class VelocitySchedulerTest {
.repeat(Duration.ofMillis(5))
.schedule();
runningLatch.await();
backend.advance(50, TimeUnit.MILLISECONDS); // run first tick only (no wall clock)
assertTrue(runningLatch.await(5, TimeUnit.SECONDS));
assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1);
assertEquals(1, scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size());
endingLatch.countDown();
}
@Test
void testConsumerCancel() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend();
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend);
CountDownLatch latch = new CountDownLatch(1);
ScheduledTask task = scheduler.buildTask(
@@ -108,14 +123,17 @@ class VelocitySchedulerTest {
assertEquals(TaskStatus.SCHEDULED, task.status());
latch.await();
backend.runUntilIdle(); // initialDelay is 0 -> due immediately in virtual time
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(TaskStatus.CANCELLED, task.status());
}
@Test
void testConsumerEquality() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
DeterministicSchedulerBackend backend = new DeterministicSchedulerBackend();
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager(), backend);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<ScheduledTask> consumerTask = new AtomicReference<>();
@@ -127,10 +145,10 @@ class VelocitySchedulerTest {
}).delay(60, TimeUnit.MILLISECONDS).schedule();
initialTask.set(task);
latch.await();
backend.advance(60, TimeUnit.MILLISECONDS);
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(consumerTask.get(), initialTask.get());
}
}
}