It sounds simple, but having spent a weekend trying to measure the performance of sync blocking thrift calls vs async, I thought why not build my own select based http server and client.
I thought it would be interesting if I could write a server and client which could make 1000 concurrent requests transferring about 100KB of data with as little overhead as possible.
So here goes:
@Slf4j
public class NettyServer {
private static final int SERVER_LATENCY = 1000;
private final int port;
private final boolean longlivedConnections;
private final Map<String, Long> startTimes = new ConcurrentHashMap<>(1000);
private static final Timer timer = new Timer();
public NettyServer(int port, boolean longlivedConnections) {
this.port = port;
this.longlivedConnections = longlivedConnections;
}
public void startNettyServer() {
final Map<String, ByteBuffer> messages = new ConcurrentHashMap<>(1000);
final EventLoopGroup bossGroup = new NioEventLoopGroup(2);
final EventLoopGroup workerGroup = new NioEventLoopGroup(4); // 2 seems to work as well
try {
final ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.ALLOW_HALF_CLOSURE, true) // This option doesn't work
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true); // Because fuck you
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuffer byteBuffer = ((ByteBuf) msg).nioBuffer();
String id = ctx.channel().id().asLongText();
// log.debug("[{}] received {} bytes", id, byteBuffer.limit());
ByteBuffer fullRequestBuffer = messages.get(id);
if (fullRequestBuffer != null) {
copyBytes(byteBuffer, 0, byteBuffer.capacity(), fullRequestBuffer, fullRequestBuffer.position());
} else {
startTimes.put(id, System.currentTimeMillis());
int len = byteBuffer.getInt(0) - 4;
log.info("[{}] New request: {} bytes", id, len);
fullRequestBuffer = ByteBuffer.allocate(len);
messages.put(id, fullRequestBuffer);
copyBytes(byteBuffer, 4, byteBuffer.capacity() - 4, fullRequestBuffer, fullRequestBuffer.position());
}
if (!fullRequestBuffer.hasRemaining()) {
// Done reading all the bytes
fullRequestBuffer.flip();
processRequest(ctx, fullRequestBuffer);
log.info("[{}] Server time to last request byte: {}", id, System.currentTimeMillis() - startTimes.get(id));
messages.remove(id);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Exception caught: ", cause);
ctx.close();
}
});
}
});
Channel ch = b.bind("localhost", port).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
log.error("Exception: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// Dummy handler. Sends response after 1 second
// Do heavy lifting in a separate executor service so that the event loop thread is unblocked
protected void processRequest(ChannelHandlerContext ctx, ByteBuffer byteBuffer) {
String id = ctx.channel().id().asLongText();
timer.schedule(new TimerTask() {
@Override
public void run() {
ctx.write(Unpooled.copiedBuffer(CONTENT.asReadOnlyBuffer()));
ctx.flush();
if (!longlivedConnections) {
ctx.close();
}
log.info("[{}] Server time to first response byte: {}", id, System.currentTimeMillis() - startTimes.getOrDefault(id, 0L));
startTimes.remove(id);
}
}, SERVER_LATENCY);
}
private static final ByteBuffer CONTENT = NettyTest.byteContent(NettyTest.NUM_BYTES);
private static void copyBytes(ByteBuffer src, int start, int length, ByteBuffer dest, int position) {
for (int i = 0; i < length; i++) {
dest.put(position + i, src.get(start + i));
}
dest.position(position + length);
}
}
No comments:
Post a Comment