Sunday, January 7, 2018

Making lots of RPC calls in parallel



It sounds simple, but having spent a weekend trying to measure the performance of sync blocking thrift calls vs async, I thought why not build my own select based http server and client.

I thought it would be interesting if I could write a server and client which could make 1000 concurrent requests transferring about 100KB of data with as little overhead as possible.

So here goes:

@Slf4j
public class NettyServer {
    private static final int SERVER_LATENCY = 1000;

    private final int port;
    private final boolean longlivedConnections;
    private final Map<String, Long> startTimes = new ConcurrentHashMap<>(1000);
    private static final Timer timer = new Timer();

    public NettyServer(int port, boolean longlivedConnections) {
        this.port = port;
        this.longlivedConnections = longlivedConnections;
    }

    public void startNettyServer() {
        final Map<String, ByteBuffer> messages = new ConcurrentHashMap<>(1000);

        final EventLoopGroup bossGroup = new NioEventLoopGroup(2);
        final EventLoopGroup workerGroup = new NioEventLoopGroup(4);        // 2 seems to work as well
        try {
            final ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.ALLOW_HALF_CLOSURE, true)         // This option doesn't work
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                                    ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);       // Because fuck you
                                }

                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuffer byteBuffer = ((ByteBuf) msg).nioBuffer();
                                    String id = ctx.channel().id().asLongText();
                                    // log.debug("[{}] received {} bytes", id, byteBuffer.limit());

                                    ByteBuffer fullRequestBuffer = messages.get(id);
                                    if (fullRequestBuffer != null) {
                                        copyBytes(byteBuffer, 0, byteBuffer.capacity(), fullRequestBuffer, fullRequestBuffer.position());
                                    } else {
                                        startTimes.put(id, System.currentTimeMillis());
                                        int len = byteBuffer.getInt(0) - 4;

                                        log.info("[{}] New request: {} bytes", id, len);
                                        fullRequestBuffer = ByteBuffer.allocate(len);
                                        messages.put(id, fullRequestBuffer);

                                        copyBytes(byteBuffer, 4, byteBuffer.capacity() - 4, fullRequestBuffer, fullRequestBuffer.position());
                                    }

                                    if (!fullRequestBuffer.hasRemaining()) {
                                        // Done reading all the bytes
                                        fullRequestBuffer.flip();
                                        processRequest(ctx, fullRequestBuffer);

                                        log.info("[{}] Server time to last request byte: {}", id, System.currentTimeMillis() - startTimes.get(id));

                                        messages.remove(id);
                                    }
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                    log.error("Exception caught: ", cause);
                                    ctx.close();
                                }
                            });
                        }
                    });

            Channel ch = b.bind("localhost", port).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            log.error("Exception: ", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    // Dummy handler. Sends response after 1 second
    // Do heavy lifting in a separate executor service so that the event loop thread is unblocked
    protected void processRequest(ChannelHandlerContext ctx, ByteBuffer byteBuffer) {
        String id = ctx.channel().id().asLongText();

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                ctx.write(Unpooled.copiedBuffer(CONTENT.asReadOnlyBuffer()));
                ctx.flush();
                if (!longlivedConnections) {
                    ctx.close();
                }

                log.info("[{}] Server time to first response byte: {}", id, System.currentTimeMillis() - startTimes.getOrDefault(id, 0L));
                startTimes.remove(id);
            }
        }, SERVER_LATENCY);
    }
    private static final ByteBuffer CONTENT = NettyTest.byteContent(NettyTest.NUM_BYTES);

    private static void copyBytes(ByteBuffer src, int start, int length, ByteBuffer dest, int position) {
        for (int i = 0; i < length; i++) {
            dest.put(position + i, src.get(start + i));
        }
        dest.position(position + length);
    }
}

No comments:

Post a Comment