Friday, January 26, 2018

1000+ thrift calls at a time

Following up on where we left off http://lawleit-l.blogspot.in/2018/01/making-lots-of-rpc-calls-in-parallel.html, how do you make tons of thrift calls without taking a performance hit synchronous systems cause.


Netty seems to be much more performant than Jetty for 1000+ connections and requests.


package async.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TByteBuffer;
import org.apache.thrift.transport.TTransport;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;

/**
 * Thrift server over netty
 *
 * @author Gagandeep Singh
 */
@Slf4j
public class NettyThriftServer {
    private final TProcessor processor;
    private final TProtocolFactory factory;
    private final ExecutorService executor;
    private final int port;

    public NettyThriftServer(TProcessor processor, TProtocolFactory protocolFactory,
                             ExecutorService executor, int port) {
        this.processor = processor;
        this.factory = protocolFactory;
        this.executor = executor;
        this.port = port;
    }

    public void start() {
        final EventLoopGroup bossGroup = new NioEventLoopGroup(2);
        final EventLoopGroup workerGroup = new NioEventLoopGroup(4);        // 2 seems to work as well
        try {
            final ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.ALLOW_HALF_CLOSURE, true)         // This option doesn't work
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("decoder", new HttpRequestDecoder());
                            ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));
                            ch.pipeline().addLast("request", new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                                    ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);       // Because fuck you
                                }

                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf byteBuffer;
                                    if (msg instanceof FullHttpRequest) {
                                        FullHttpRequest request = (FullHttpRequest) msg;
                                        byteBuffer = request.content();
                                    } else if (msg instanceof ByteBuf) {
                                        byteBuffer = (ByteBuf) msg;
                                    } else {
                                        return;
                                    }

                                    String id = ctx.channel().id().asLongText();
                                    log.debug("[{}] received {} bytes", id, byteBuffer.capacity());

                                    // Copy the content and free up the caller buffer
                                    byte content[] = new byte[byteBuffer.capacity()];
                                    for (int i = 0; i < byteBuffer.capacity(); i++) {
                                        content[i] = byteBuffer.getByte(i);
                                    }

                                    // Submit task to executor so that the netty worker thread is freed
                                    executor.submit(() -> process(content, ctx));
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                    log.error("Exception caught: {}", cause.getMessage());
                                    log.debug("Exception: ", cause);
                                    ctx.close();
                                }
                            });
                        }
                    });

            log.info("About to start netty server on port: {}", port);
            Channel ch = b.bind("localhost", port).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            log.error("Exception: ", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    protected void process(byte[] content, ChannelHandlerContext ctx) {
        log.debug("netty - bytes read: {}", content);

        ByteBuffer out = ByteBuffer.allocate(200);
        TProtocol inProtocol = factory.getProtocol(new TByteBuffer(ByteBuffer.wrap(content)));
        TProtocol outProtocol = factory.getProtocol(new TByteBuffer(out));

        try {
            processor.process(inProtocol, outProtocol);
        } catch (TException e) {
            log.error("Exception: ", e);
        }

        out.flip();
        log.debug("Netty - Sending {} bytes", out.limit());
        ctx.write(Unpooled.copiedBuffer("HTTP/1.1 200 OK\nContent-Type: application/x-thrift\nContent-Length: " + out.limit() + "\n\n", CharsetUtil.UTF_8));
        ctx.write(Unpooled.copiedBuffer(out.asReadOnlyBuffer()));
        ctx.flush();
    }

    public static class TBinaryProtocolExtended extends TBinaryProtocol {
        public TBinaryProtocolExtended(TTransport trans) {
            super(trans);
        }

        public void setTransport(TTransport transport) {
            trans_ = transport;
        }
    }
}



Client making lots of calls

package async.http;

import ch.qos.logback.classic.Level;
import async.http.NettyThriftServer.TBinaryProtocolExtended;
import com.olashare.matching.common.utils.FutureUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TByteBuffer;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * Thrift over async http.
 *
 * @author Gagandeep Singh
 */
@Slf4j
public class ThriftHttpPerf {
    public static class SyncServer implements Iface {
        @Override
        public ByteBuffer testApi(short sleepTimeMs, ByteBuffer input) throws TException {
            try {
                Thread.sleep(sleepTimeMs);
            } catch (Exception e) {
                // Ignore
                log.error("Exception: ", e);
            }
            return input;
        }
    }

    static void startHttpServer(int port, int clientTimeout) {
        SyncServer syncServer = new SyncServer();
        OptimalAssignmentService.Processor processor = new OptimalAssignmentService.Processor<>(syncServer);
        TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
        ExecutorService executor = Executors.newFixedThreadPool(1100);

        NettyThriftServer server = new NettyThriftServer(processor, factory, executor, port);
        server.start();
    }


    public static void main(String args[]) throws Exception {
        ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) org.slf4j.LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
        root.setLevel(Level.INFO);

        new Thread(() -> startHttpServer(9997, 1000)).start();
        Thread.sleep(1000);

        AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder()
                .setSoReuseAddress(true)
                .setKeepAlive(true)
                .setReadTimeout(1000)
                .setConnectTimeout(100)
                .build());

        // Thrift async
        String url = "http://localhost:9997";

        int numIterations = 200;
        int parallelism = 1000;
        String content = "Hello";

        for (int i = 0; i < numIterations; i++) {
            long now = System.currentTimeMillis();

            List<Client> clients = new ArrayList<>();
            List<TBinaryProtocolExtended> protocols = new ArrayList<>();
            List<Future<ByteBuffer>> futures = new ArrayList<>(parallelism);

            for (int j = 0; j < parallelism; j++) {
                AsyncThriftHttpClient tHttpClient = new AsyncThriftHttpClient(url, asyncHttpClient);
                TBinaryProtocolExtended tProtocol = new TBinaryProtocolExtended(tHttpClient);
                OptimalAssignmentService.Client client = new OptimalAssignmentService.Client(tProtocol);

                client.send_testApi((short) 100, ByteBuffer.wrap(content.getBytes()));
                futures.add(tHttpClient.getFuture());
                protocols.add(tProtocol);
                clients.add(client);
            }

            // Wait for full response
            List<ByteBuffer> responses = FutureUtil.waitForAllFutures(futures);

            for (int j = 0; j < parallelism; j++) {
                TBinaryProtocolExtended tProtocol = protocols.get(j);
                Client client = clients.get(j);
                tProtocol.setTransport(new TByteBuffer(responses.get(j)));

                ByteBuffer buffer = client.recv_testApi();
                assert buffer.limit() == content.length();
            }

            log.info("Time taken in testApi: {}", System.currentTimeMillis() - now);
        }

        System.exit(0);
    }
}

Sunday, January 7, 2018

Making lots of RPC calls in parallel



It sounds simple, but having spent a weekend trying to measure the performance of sync blocking thrift calls vs async, I thought why not build my own select based http server and client.

I thought it would be interesting if I could write a server and client which could make 1000 concurrent requests transferring about 100KB of data with as little overhead as possible.

So here goes:

@Slf4j
public class NettyServer {
    private static final int SERVER_LATENCY = 1000;

    private final int port;
    private final boolean longlivedConnections;
    private final Map<String, Long> startTimes = new ConcurrentHashMap<>(1000);
    private static final Timer timer = new Timer();

    public NettyServer(int port, boolean longlivedConnections) {
        this.port = port;
        this.longlivedConnections = longlivedConnections;
    }

    public void startNettyServer() {
        final Map<String, ByteBuffer> messages = new ConcurrentHashMap<>(1000);

        final EventLoopGroup bossGroup = new NioEventLoopGroup(2);
        final EventLoopGroup workerGroup = new NioEventLoopGroup(4);        // 2 seems to work as well
        try {
            final ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.ALLOW_HALF_CLOSURE, true)         // This option doesn't work
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                                    ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);       // Because fuck you
                                }

                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuffer byteBuffer = ((ByteBuf) msg).nioBuffer();
                                    String id = ctx.channel().id().asLongText();
                                    // log.debug("[{}] received {} bytes", id, byteBuffer.limit());

                                    ByteBuffer fullRequestBuffer = messages.get(id);
                                    if (fullRequestBuffer != null) {
                                        copyBytes(byteBuffer, 0, byteBuffer.capacity(), fullRequestBuffer, fullRequestBuffer.position());
                                    } else {
                                        startTimes.put(id, System.currentTimeMillis());
                                        int len = byteBuffer.getInt(0) - 4;

                                        log.info("[{}] New request: {} bytes", id, len);
                                        fullRequestBuffer = ByteBuffer.allocate(len);
                                        messages.put(id, fullRequestBuffer);

                                        copyBytes(byteBuffer, 4, byteBuffer.capacity() - 4, fullRequestBuffer, fullRequestBuffer.position());
                                    }

                                    if (!fullRequestBuffer.hasRemaining()) {
                                        // Done reading all the bytes
                                        fullRequestBuffer.flip();
                                        processRequest(ctx, fullRequestBuffer);

                                        log.info("[{}] Server time to last request byte: {}", id, System.currentTimeMillis() - startTimes.get(id));

                                        messages.remove(id);
                                    }
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                    log.error("Exception caught: ", cause);
                                    ctx.close();
                                }
                            });
                        }
                    });

            Channel ch = b.bind("localhost", port).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            log.error("Exception: ", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    // Dummy handler. Sends response after 1 second
    // Do heavy lifting in a separate executor service so that the event loop thread is unblocked
    protected void processRequest(ChannelHandlerContext ctx, ByteBuffer byteBuffer) {
        String id = ctx.channel().id().asLongText();

        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                ctx.write(Unpooled.copiedBuffer(CONTENT.asReadOnlyBuffer()));
                ctx.flush();
                if (!longlivedConnections) {
                    ctx.close();
                }

                log.info("[{}] Server time to first response byte: {}", id, System.currentTimeMillis() - startTimes.getOrDefault(id, 0L));
                startTimes.remove(id);
            }
        }, SERVER_LATENCY);
    }
    private static final ByteBuffer CONTENT = NettyTest.byteContent(NettyTest.NUM_BYTES);

    private static void copyBytes(ByteBuffer src, int start, int length, ByteBuffer dest, int position) {
        for (int i = 0; i < length; i++) {
            dest.put(position + i, src.get(start + i));
        }
        dest.position(position + length);
    }
}