Following up on where we left off http://lawleit-l.blogspot.in/2018/01/making-lots-of-rpc-calls-in-parallel.html, how do you make tons of thrift calls without taking a performance hit synchronous systems cause.
Netty seems to be much more performant than Jetty for 1000+ connections and requests.
package async.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TByteBuffer;
import org.apache.thrift.transport.TTransport;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
/**
* Thrift server over netty
*
* @author Gagandeep Singh
*/
@Slf4j
public class NettyThriftServer {
private final TProcessor processor;
private final TProtocolFactory factory;
private final ExecutorService executor;
private final int port;
public NettyThriftServer(TProcessor processor, TProtocolFactory protocolFactory,
ExecutorService executor, int port) {
this.processor = processor;
this.factory = protocolFactory;
this.executor = executor;
this.port = port;
}
public void start() {
final EventLoopGroup bossGroup = new NioEventLoopGroup(2);
final EventLoopGroup workerGroup = new NioEventLoopGroup(4); // 2 seems to work as well
try {
final ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.ALLOW_HALF_CLOSURE, true) // This option doesn't work
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));
ch.pipeline().addLast("request", new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true); // Because fuck you
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuffer;
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
byteBuffer = request.content();
} else if (msg instanceof ByteBuf) {
byteBuffer = (ByteBuf) msg;
} else {
return;
}
String id = ctx.channel().id().asLongText();
log.debug("[{}] received {} bytes", id, byteBuffer.capacity());
// Copy the content and free up the caller buffer
byte content[] = new byte[byteBuffer.capacity()];
for (int i = 0; i < byteBuffer.capacity(); i++) {
content[i] = byteBuffer.getByte(i);
}
// Submit task to executor so that the netty worker thread is freed
executor.submit(() -> process(content, ctx));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Exception caught: {}", cause.getMessage());
log.debug("Exception: ", cause);
ctx.close();
}
});
}
});
log.info("About to start netty server on port: {}", port);
Channel ch = b.bind("localhost", port).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
log.error("Exception: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
protected void process(byte[] content, ChannelHandlerContext ctx) {
log.debug("netty - bytes read: {}", content);
ByteBuffer out = ByteBuffer.allocate(200);
TProtocol inProtocol = factory.getProtocol(new TByteBuffer(ByteBuffer.wrap(content)));
TProtocol outProtocol = factory.getProtocol(new TByteBuffer(out));
try {
processor.process(inProtocol, outProtocol);
} catch (TException e) {
log.error("Exception: ", e);
}
out.flip();
log.debug("Netty - Sending {} bytes", out.limit());
ctx.write(Unpooled.copiedBuffer("HTTP/1.1 200 OK\nContent-Type: application/x-thrift\nContent-Length: " + out.limit() + "\n\n", CharsetUtil.UTF_8));
ctx.write(Unpooled.copiedBuffer(out.asReadOnlyBuffer()));
ctx.flush();
}
public static class TBinaryProtocolExtended extends TBinaryProtocol {
public TBinaryProtocolExtended(TTransport trans) {
super(trans);
}
public void setTransport(TTransport transport) {
trans_ = transport;
}
}
}
Client making lots of calls
package async.http;
import ch.qos.logback.classic.Level;
import async.http.NettyThriftServer.TBinaryProtocolExtended;
import com.olashare.matching.common.utils.FutureUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TByteBuffer;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* Thrift over async http.
*
* @author Gagandeep Singh
*/
@Slf4j
public class ThriftHttpPerf {
public static class SyncServer implements Iface {
@Override
public ByteBuffer testApi(short sleepTimeMs, ByteBuffer input) throws TException {
try {
Thread.sleep(sleepTimeMs);
} catch (Exception e) {
// Ignore
log.error("Exception: ", e);
}
return input;
}
}
static void startHttpServer(int port, int clientTimeout) {
SyncServer syncServer = new SyncServer();
OptimalAssignmentService.Processor processor = new OptimalAssignmentService.Processor<>(syncServer);
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
ExecutorService executor = Executors.newFixedThreadPool(1100);
NettyThriftServer server = new NettyThriftServer(processor, factory, executor, port);
server.start();
}
public static void main(String args[]) throws Exception {
ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) org.slf4j.LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
root.setLevel(Level.INFO);
new Thread(() -> startHttpServer(9997, 1000)).start();
Thread.sleep(1000);
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder()
.setSoReuseAddress(true)
.setKeepAlive(true)
.setReadTimeout(1000)
.setConnectTimeout(100)
.build());
// Thrift async
String url = "http://localhost:9997";
int numIterations = 200;
int parallelism = 1000;
String content = "Hello";
for (int i = 0; i < numIterations; i++) {
long now = System.currentTimeMillis();
List<Client> clients = new ArrayList<>();
List<TBinaryProtocolExtended> protocols = new ArrayList<>();
List<Future<ByteBuffer>> futures = new ArrayList<>(parallelism);
for (int j = 0; j < parallelism; j++) {
AsyncThriftHttpClient tHttpClient = new AsyncThriftHttpClient(url, asyncHttpClient);
TBinaryProtocolExtended tProtocol = new TBinaryProtocolExtended(tHttpClient);
OptimalAssignmentService.Client client = new OptimalAssignmentService.Client(tProtocol);
client.send_testApi((short) 100, ByteBuffer.wrap(content.getBytes()));
futures.add(tHttpClient.getFuture());
protocols.add(tProtocol);
clients.add(client);
}
// Wait for full response
List<ByteBuffer> responses = FutureUtil.waitForAllFutures(futures);
for (int j = 0; j < parallelism; j++) {
TBinaryProtocolExtended tProtocol = protocols.get(j);
Client client = clients.get(j);
tProtocol.setTransport(new TByteBuffer(responses.get(j)));
ByteBuffer buffer = client.recv_testApi();
assert buffer.limit() == content.length();
}
log.info("Time taken in testApi: {}", System.currentTimeMillis() - now);
}
System.exit(0);
}
}
Netty seems to be much more performant than Jetty for 1000+ connections and requests.
package async.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TByteBuffer;
import org.apache.thrift.transport.TTransport;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
/**
* Thrift server over netty
*
* @author Gagandeep Singh
*/
@Slf4j
public class NettyThriftServer {
private final TProcessor processor;
private final TProtocolFactory factory;
private final ExecutorService executor;
private final int port;
public NettyThriftServer(TProcessor processor, TProtocolFactory protocolFactory,
ExecutorService executor, int port) {
this.processor = processor;
this.factory = protocolFactory;
this.executor = executor;
this.port = port;
}
public void start() {
final EventLoopGroup bossGroup = new NioEventLoopGroup(2);
final EventLoopGroup workerGroup = new NioEventLoopGroup(4); // 2 seems to work as well
try {
final ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.ALLOW_HALF_CLOSURE, true) // This option doesn't work
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(io.netty.channel.socket.SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(1048576));
ch.pipeline().addLast("request", new ChannelInboundHandlerAdapter() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.channel().config().setOption(ChannelOption.ALLOW_HALF_CLOSURE, true); // Because fuck you
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuffer;
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
byteBuffer = request.content();
} else if (msg instanceof ByteBuf) {
byteBuffer = (ByteBuf) msg;
} else {
return;
}
String id = ctx.channel().id().asLongText();
log.debug("[{}] received {} bytes", id, byteBuffer.capacity());
// Copy the content and free up the caller buffer
byte content[] = new byte[byteBuffer.capacity()];
for (int i = 0; i < byteBuffer.capacity(); i++) {
content[i] = byteBuffer.getByte(i);
}
// Submit task to executor so that the netty worker thread is freed
executor.submit(() -> process(content, ctx));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Exception caught: {}", cause.getMessage());
log.debug("Exception: ", cause);
ctx.close();
}
});
}
});
log.info("About to start netty server on port: {}", port);
Channel ch = b.bind("localhost", port).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
log.error("Exception: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
protected void process(byte[] content, ChannelHandlerContext ctx) {
log.debug("netty - bytes read: {}", content);
ByteBuffer out = ByteBuffer.allocate(200);
TProtocol inProtocol = factory.getProtocol(new TByteBuffer(ByteBuffer.wrap(content)));
TProtocol outProtocol = factory.getProtocol(new TByteBuffer(out));
try {
processor.process(inProtocol, outProtocol);
} catch (TException e) {
log.error("Exception: ", e);
}
out.flip();
log.debug("Netty - Sending {} bytes", out.limit());
ctx.write(Unpooled.copiedBuffer("HTTP/1.1 200 OK\nContent-Type: application/x-thrift\nContent-Length: " + out.limit() + "\n\n", CharsetUtil.UTF_8));
ctx.write(Unpooled.copiedBuffer(out.asReadOnlyBuffer()));
ctx.flush();
}
public static class TBinaryProtocolExtended extends TBinaryProtocol {
public TBinaryProtocolExtended(TTransport trans) {
super(trans);
}
public void setTransport(TTransport transport) {
trans_ = transport;
}
}
}
Client making lots of calls
package async.http;
import ch.qos.logback.classic.Level;
import async.http.NettyThriftServer.TBinaryProtocolExtended;
import com.olashare.matching.common.utils.FutureUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TByteBuffer;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* Thrift over async http.
*
* @author Gagandeep Singh
*/
@Slf4j
public class ThriftHttpPerf {
public static class SyncServer implements Iface {
@Override
public ByteBuffer testApi(short sleepTimeMs, ByteBuffer input) throws TException {
try {
Thread.sleep(sleepTimeMs);
} catch (Exception e) {
// Ignore
log.error("Exception: ", e);
}
return input;
}
}
static void startHttpServer(int port, int clientTimeout) {
SyncServer syncServer = new SyncServer();
OptimalAssignmentService.Processor processor = new OptimalAssignmentService.Processor<>(syncServer);
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
ExecutorService executor = Executors.newFixedThreadPool(1100);
NettyThriftServer server = new NettyThriftServer(processor, factory, executor, port);
server.start();
}
public static void main(String args[]) throws Exception {
ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) org.slf4j.LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
root.setLevel(Level.INFO);
new Thread(() -> startHttpServer(9997, 1000)).start();
Thread.sleep(1000);
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder()
.setSoReuseAddress(true)
.setKeepAlive(true)
.setReadTimeout(1000)
.setConnectTimeout(100)
.build());
// Thrift async
String url = "http://localhost:9997";
int numIterations = 200;
int parallelism = 1000;
String content = "Hello";
for (int i = 0; i < numIterations; i++) {
long now = System.currentTimeMillis();
List<Client> clients = new ArrayList<>();
List<TBinaryProtocolExtended> protocols = new ArrayList<>();
List<Future<ByteBuffer>> futures = new ArrayList<>(parallelism);
for (int j = 0; j < parallelism; j++) {
AsyncThriftHttpClient tHttpClient = new AsyncThriftHttpClient(url, asyncHttpClient);
TBinaryProtocolExtended tProtocol = new TBinaryProtocolExtended(tHttpClient);
OptimalAssignmentService.Client client = new OptimalAssignmentService.Client(tProtocol);
client.send_testApi((short) 100, ByteBuffer.wrap(content.getBytes()));
futures.add(tHttpClient.getFuture());
protocols.add(tProtocol);
clients.add(client);
}
// Wait for full response
List<ByteBuffer> responses = FutureUtil.waitForAllFutures(futures);
for (int j = 0; j < parallelism; j++) {
TBinaryProtocolExtended tProtocol = protocols.get(j);
Client client = clients.get(j);
tProtocol.setTransport(new TByteBuffer(responses.get(j)));
ByteBuffer buffer = client.recv_testApi();
assert buffer.limit() == content.length();
}
log.info("Time taken in testApi: {}", System.currentTimeMillis() - now);
}
System.exit(0);
}
}