Add experimental io_uring support

This just came off the presses, there are issues but this is for those who want to start playing with this with the risk that they could shoot themselves in the foot.
This commit is contained in:
Andrew Steinborn
2020-11-16 06:49:57 -05:00
parent 7ead4add67
commit 218b24024f
4 changed files with 37 additions and 12 deletions

View File

@@ -26,7 +26,7 @@ allprojects {
junitVersion = '5.7.0' junitVersion = '5.7.0'
slf4jVersion = '1.7.30' slf4jVersion = '1.7.30'
log4jVersion = '2.13.3' log4jVersion = '2.13.3'
nettyVersion = '4.1.52.Final' nettyVersion = '4.1.54.Final'
guavaVersion = '25.1-jre' guavaVersion = '25.1-jre'
checkerFrameworkVersion = '3.6.1' checkerFrameworkVersion = '3.6.1'
configurateVersion = '3.7.1' configurateVersion = '3.7.1'

View File

@@ -52,6 +52,7 @@ dependencies {
implementation "io.netty:netty-transport-native-epoll:${nettyVersion}" implementation "io.netty:netty-transport-native-epoll:${nettyVersion}"
implementation "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-x86_64" implementation "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-x86_64"
implementation "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-aarch64" implementation "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-aarch64"
implementation "io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.1.Final:linux-x86_64"
implementation "io.netty:netty-resolver-dns:${nettyVersion}" implementation "io.netty:netty-resolver-dns:${nettyVersion}"
implementation "org.apache.logging.log4j:log4j-api:${log4jVersion}" implementation "org.apache.logging.log4j:log4j-api:${log4jVersion}"

View File

@@ -16,6 +16,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollChannelOption; import io.netty.channel.epoll.EpollChannelOption;
import io.netty.incubator.channel.uring.IOUringChannelOption;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashMap; import java.util.HashMap;
@@ -23,6 +24,7 @@ import java.util.Map;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.filter.FilterContext; import org.asynchttpclient.filter.FilterContext;
import org.asynchttpclient.filter.FilterContext.FilterContextBuilder; import org.asynchttpclient.filter.FilterContext.FilterContextBuilder;
@@ -64,20 +66,23 @@ public final class ConnectionManager {
this.backendChannelInitializer = new BackendChannelInitializerHolder( this.backendChannelInitializer = new BackendChannelInitializerHolder(
new BackendChannelInitializer(this.server)); new BackendChannelInitializer(this.server));
this.resolver = new SeparatePoolInetNameResolver(GlobalEventExecutor.INSTANCE); this.resolver = new SeparatePoolInetNameResolver(GlobalEventExecutor.INSTANCE);
this.httpClient = asyncHttpClient(config()
.setEventLoopGroup(this.workerGroup) DefaultAsyncHttpClientConfig.Builder httpClientBuilder = config()
.setUserAgent(server.getVersion().getName() + "/" + server.getVersion().getVersion()) .setUserAgent(server.getVersion().getName() + "/" + server.getVersion().getVersion())
.addRequestFilter(new RequestFilter() { .addRequestFilter(new RequestFilter() {
@Override @Override
public <T> FilterContext<T> filter(FilterContext<T> ctx) { public <T> FilterContext<T> filter(FilterContext<T> ctx) {
return new FilterContextBuilder<>(ctx) return new FilterContextBuilder<>(ctx)
.request(new RequestBuilder(ctx.getRequest()) .request(ctx.getRequest().toBuilder()
.setNameResolver(resolver) .setNameResolver(resolver)
.build()) .build())
.build(); .build();
} }
}) });
.build()); if (this.transportType != TransportType.IO_URING) {
httpClientBuilder = httpClientBuilder.setEventLoopGroup(this.workerGroup);
}
this.httpClient = asyncHttpClient(httpClientBuilder.build());
} }
public void logChannelInformation() { public void logChannelInformation() {
@@ -100,8 +105,12 @@ public final class ConnectionManager {
.childOption(ChannelOption.IP_TOS, 0x18) .childOption(ChannelOption.IP_TOS, 0x18)
.localAddress(address); .localAddress(address);
if (transportType == TransportType.EPOLL && server.getConfiguration().useTcpFastOpen()) { if (server.getConfiguration().useTcpFastOpen()) {
bootstrap.option(EpollChannelOption.TCP_FASTOPEN, 3); if (transportType == TransportType.EPOLL) {
bootstrap.option(EpollChannelOption.TCP_FASTOPEN, 3);
} else if (transportType == TransportType.IO_URING) {
bootstrap.option(IOUringChannelOption.TCP_FASTOPEN, 3);
}
} }
bootstrap.bind() bootstrap.bind()
@@ -156,8 +165,12 @@ public final class ConnectionManager {
this.server.getConfiguration().getConnectTimeout()) this.server.getConfiguration().getConnectTimeout())
.group(group == null ? this.workerGroup : group) .group(group == null ? this.workerGroup : group)
.resolver(this.resolver.asGroup()); .resolver(this.resolver.asGroup());
if (transportType == TransportType.EPOLL && server.getConfiguration().useTcpFastOpen()) { if (server.getConfiguration().useTcpFastOpen()) {
bootstrap.option(EpollChannelOption.TCP_FASTOPEN_CONNECT, true); if (transportType == TransportType.EPOLL) {
bootstrap.option(EpollChannelOption.TCP_FASTOPEN_CONNECT, true);
} else if (transportType == TransportType.IO_URING) {
bootstrap.option(IOUringChannelOption.TCP_FASTOPEN_CONNECT, true);
}
} }
return bootstrap; return bootstrap;
} }

View File

@@ -15,6 +15,11 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.incubator.channel.uring.IOUring;
import io.netty.incubator.channel.uring.IOUringDatagramChannel;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.incubator.channel.uring.IOUringSocketChannel;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@@ -26,7 +31,11 @@ enum TransportType {
EPOLL("epoll", EpollServerSocketChannel::new, EPOLL("epoll", EpollServerSocketChannel::new,
EpollSocketChannel::new, EpollSocketChannel::new,
EpollDatagramChannel::new, EpollDatagramChannel::new,
(name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))); (name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))),
IO_URING("io_uring", IOUringServerSocketChannel::new,
IOUringSocketChannel::new,
IOUringDatagramChannel::new,
(name, type) -> new IOUringEventLoopGroup(0, createThreadFactory(name, type)));
final String name; final String name;
final ChannelFactory<? extends ServerSocketChannel> serverSocketChannelFactory; final ChannelFactory<? extends ServerSocketChannel> serverSocketChannelFactory;
@@ -64,7 +73,9 @@ enum TransportType {
return NIO; return NIO;
} }
if (Epoll.isAvailable()) { if (IOUring.isAvailable()) {
return IO_URING;
} else if (Epoll.isAvailable()) {
return EPOLL; return EPOLL;
} else { } else {
return NIO; return NIO;