Friday, January 26, 2018

1000+ thrift calls at a time

Following up on where we left off http://lawleit-l.blogspot.in/2018/01/making-lots-of-rpc-calls-in-parallel.html, how do you make tons of thrift calls without taking a performance hit synchronous systems cause.


Netty seems to be much more performant than Jetty for 1000+ connections and requests.


package async.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TByteBuffer;
import org.apache.thrift.transport.TTransport;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;

/**
 * Thrift server over netty
 *
 * @author Gagandeep Singh
 */
@Slf4j
public class NettyThriftServer {
    private final TProcessor processor;
    private final TProtocolFactory factory;
    private final ExecutorService executor;
    private final int port;

    public NettyThriftServer(TProcessor processor, TProtocolFactory protocolFactory,
                             ExecutorService executor, int port) {
        this.processor = processor;
        this.factory = protocolFactory;
        this.executor = executor;
        this.port = port;
    }

    public void start() {
        final EventLoopGroup bossGroup = new NioEventLoopGroup(2);
        final EventLoopGroup workerGroup = new NioEventLoopGroup(4);        // 2 seems to work as well
        try {
            final ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.ALLOW_HALF_CLOSURE, true)         // This option doesn't work
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("decoder", new HttpRequestDecoder());
                            ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));
                            ch.pipeline().addLast("request", new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                                    ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true);       // Because fuck you
                                }

                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf byteBuffer;
                                    if (msg instanceof FullHttpRequest) {
                                        FullHttpRequest request = (FullHttpRequest) msg;
                                        byteBuffer = request.content();
                                    } else if (msg instanceof ByteBuf) {
                                        byteBuffer = (ByteBuf) msg;
                                    } else {
                                        return;
                                    }

                                    String id = ctx.channel().id().asLongText();
                                    log.debug("[{}] received {} bytes", id, byteBuffer.capacity());

                                    // Copy the content and free up the caller buffer
                                    byte content[] = new byte[byteBuffer.capacity()];
                                    for (int i = 0; i < byteBuffer.capacity(); i++) {
                                        content[i] = byteBuffer.getByte(i);
                                    }

                                    // Submit task to executor so that the netty worker thread is freed
                                    executor.submit(() -> process(content, ctx));
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                    log.error("Exception caught: {}", cause.getMessage());
                                    log.debug("Exception: ", cause);
                                    ctx.close();
                                }
                            });
                        }
                    });

            log.info("About to start netty server on port: {}", port);
            Channel ch = b.bind("localhost", port).sync().channel();
            ch.closeFuture().sync();
        } catch (Exception e) {
            log.error("Exception: ", e);
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    protected void process(byte[] content, ChannelHandlerContext ctx) {
        log.debug("netty - bytes read: {}", content);

        ByteBuffer out = ByteBuffer.allocate(200);
        TProtocol inProtocol = factory.getProtocol(new TByteBuffer(ByteBuffer.wrap(content)));
        TProtocol outProtocol = factory.getProtocol(new TByteBuffer(out));

        try {
            processor.process(inProtocol, outProtocol);
        } catch (TException e) {
            log.error("Exception: ", e);
        }

        out.flip();
        log.debug("Netty - Sending {} bytes", out.limit());
        ctx.write(Unpooled.copiedBuffer("HTTP/1.1 200 OK\nContent-Type: application/x-thrift\nContent-Length: " + out.limit() + "\n\n", CharsetUtil.UTF_8));
        ctx.write(Unpooled.copiedBuffer(out.asReadOnlyBuffer()));
        ctx.flush();
    }

    public static class TBinaryProtocolExtended extends TBinaryProtocol {
        public TBinaryProtocolExtended(TTransport trans) {
            super(trans);
        }

        public void setTransport(TTransport transport) {
            trans_ = transport;
        }
    }
}



Client making lots of calls

package async.http;

import ch.qos.logback.classic.Level;
import async.http.NettyThriftServer.TBinaryProtocolExtended;
import com.olashare.matching.common.utils.FutureUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TByteBuffer;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * Thrift over async http.
 *
 * @author Gagandeep Singh
 */
@Slf4j
public class ThriftHttpPerf {
    public static class SyncServer implements Iface {
        @Override
        public ByteBuffer testApi(short sleepTimeMs, ByteBuffer input) throws TException {
            try {
                Thread.sleep(sleepTimeMs);
            } catch (Exception e) {
                // Ignore
                log.error("Exception: ", e);
            }
            return input;
        }
    }

    static void startHttpServer(int port, int clientTimeout) {
        SyncServer syncServer = new SyncServer();
        OptimalAssignmentService.Processor processor = new OptimalAssignmentService.Processor<>(syncServer);
        TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
        ExecutorService executor = Executors.newFixedThreadPool(1100);

        NettyThriftServer server = new NettyThriftServer(processor, factory, executor, port);
        server.start();
    }


    public static void main(String args[]) throws Exception {
        ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) org.slf4j.LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
        root.setLevel(Level.INFO);

        new Thread(() -> startHttpServer(9997, 1000)).start();
        Thread.sleep(1000);

        AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder()
                .setSoReuseAddress(true)
                .setKeepAlive(true)
                .setReadTimeout(1000)
                .setConnectTimeout(100)
                .build());

        // Thrift async
        String url = "http://localhost:9997";

        int numIterations = 200;
        int parallelism = 1000;
        String content = "Hello";

        for (int i = 0; i < numIterations; i++) {
            long now = System.currentTimeMillis();

            List<Client> clients = new ArrayList<>();
            List<TBinaryProtocolExtended> protocols = new ArrayList<>();
            List<Future<ByteBuffer>> futures = new ArrayList<>(parallelism);

            for (int j = 0; j < parallelism; j++) {
                AsyncThriftHttpClient tHttpClient = new AsyncThriftHttpClient(url, asyncHttpClient);
                TBinaryProtocolExtended tProtocol = new TBinaryProtocolExtended(tHttpClient);
                OptimalAssignmentService.Client client = new OptimalAssignmentService.Client(tProtocol);

                client.send_testApi((short) 100, ByteBuffer.wrap(content.getBytes()));
                futures.add(tHttpClient.getFuture());
                protocols.add(tProtocol);
                clients.add(client);
            }

            // Wait for full response
            List<ByteBuffer> responses = FutureUtil.waitForAllFutures(futures);

            for (int j = 0; j < parallelism; j++) {
                TBinaryProtocolExtended tProtocol = protocols.get(j);
                Client client = clients.get(j);
                tProtocol.setTransport(new TByteBuffer(responses.get(j)));

                ByteBuffer buffer = client.recv_testApi();
                assert buffer.limit() == content.length();
            }

            log.info("Time taken in testApi: {}", System.currentTimeMillis() - now);
        }

        System.exit(0);
    }
}

No comments:

Post a Comment