Недавно познакомился с фреймворком Netty. В традиционной модели на каждое соединение создается один поток, который его обрабатывает. Это приводит к росту нагрузки на CPU и объема потребляемой памяти. Netty использует другой подход, позволяющий обрабатывать одновременно количество соединений большее, чем количество запущенных потоков. Реализован он с использованием библиотеки java.nio. В данной статье, я опишу как Netty работает и расскажу как реализовать с его помощью простой REST сервис.
Для понимания принципа работы Netty с начало нужно обратиться к паттерну pipeline (Подробная статья с примером на .net). Если коротко, то данные проходят несколько этапов обработки. На каждом этапе они обрабатываются определенным обработчиком — handler’ом. Обработчик поочередно производит действия над данными, доставая их из своей входящей очереди. Как только handler закончил выполнение, полученный результат он отправляет следующему, у которого тоже есть очередь. Получается, что то вроде конвейера.

В Netty запросы обрабатываются похожим образом.
Теперь напишем REST сервис. У которого всего один метод sum — вычисляющий сумму двух чисел, переданных в качестве параметров GET /sum?first=<первое число>&second=<второе число>
. Ответ будет приходить формата plain text в следующем виде : результат:<сумма двух чисел>
.
Для начала добавим соответствующую зависимость. В этом примере используется версия 4.0.5.Final.
<dependencies>
...
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.15.Final</version>
</dependency>
</dependencies>
Теперь напишем handler’ы для обработки данных.
FilterHandler
производит фильтрацию запросов, проверку параметров и отсеивание неверных. Его реализация:
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.*;
import java.util.List;
import java.util.regex.*;
public class FilterHandler extends MessageToMessageDecoder<DefaultFullHttpRequest> {
private static final String PREFIX_URL = "/sum";
private static final Pattern URL_PATTERN_FILTER = Pattern.compile(PREFIX_URL + "(?:\\?first=(\\d+)&second=(\\d+))");
@Override
protected void decode(ChannelHandlerContext ctx, DefaultFullHttpRequest request, List<Object> out) throws Exception {
if (request.getMethod() != HttpMethod.GET) {
HttpServer.sendError(ctx, "метод к данному ресурсу не применим", HttpResponseStatus.NOT_ACCEPTABLE);
return;
}
String url = request.getUri();
url = url == null ? "" : url.toLowerCase();
if (!url.startsWith(PREFIX_URL)) {
HttpServer.sendError(ctx, "ресурс не найден", HttpResponseStatus.NOT_FOUND);
return;
}
Matcher matcher = URL_PATTERN_FILTER.matcher(url);
if (!matcher.find())
HttpServer.sendError(ctx, "некорретно указаны параметры", HttpResponseStatus.BAD_REQUEST);
try {
long firstNumber = Long.parseLong(matcher.group(1));
long secondNumber = Long.parseLong(matcher.group(2));
request.headers().add("firstNumber", firstNumber);
request.headers().add("secondNumber", secondNumber);
out.add(request);
request.retain();
} catch (NumberFormatException e) {
HttpServer.sendError(ctx, "неверный формат параметров", HttpResponseStatus.BAD_REQUEST);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
HttpServer.sendError(ctx, "ошибка сервера:" + cause.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
После получения запроса проверим тот ли тип метода, к тому ли ресурсу происходит обращение и с теми ли параметрами. Полученные значения параметров записываем в заголовок запроса. А сам запрос добавляем в очередь, для обработки следующим handler’ом.
Следом реализуем WorkerHandler. Он вычисляет сумму и отправлять результат.
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import java.nio.charset.StandardCharsets;
public class WorkerHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
HttpServer.sendError(ctx, "ошибка сервера:" + cause.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
DefaultFullHttpRequest request = (DefaultFullHttpRequest) obj;
int firstNumber = Integer.parseInt(request.headers().get("firstNumber"));
int secondNumber = Integer.parseInt(request.headers().get("secondNumber"));
long sum = firstNumber + secondNumber;
ByteBuf content = Unpooled.copiedBuffer("результат:" + sum, StandardCharsets.UTF_8);
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
content);
response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
response.headers().set(HttpHeaders.Names.ACCEPT_CHARSET, StandardCharsets.UTF_8.name());
ChannelFuture channelFuture = ctx.writeAndFlush(response);
channelFuture.addListener(ChannelFutureListener.CLOSE);
request.release();
}
}
Теперь осталось самое малое, создать сервер, указать параметры и добавить в его pipeline наши обработчики.
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.cors.*;
import java.nio.charset.StandardCharsets;
public class HttpServer {
public static void main(String[] args) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ChannelFuture channelFuture = null;
try {
ServerBootstrap server = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
private final CorsConfig corsConfig = CorsConfig
.anyOrigin()
.allowNullOrigin()
.allowCredentials().build();
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new HttpResponseEncoder())
.addLast(new HttpRequestDecoder())
.addLast(new HttpObjectAggregator(Integer.MAX_VALUE))
.addLast(new CorsHandler(corsConfig))
.addLast(new FilterHandler())
.addLast(new WorkerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 500)
.childOption(ChannelOption.SO_KEEPALIVE, true);
channelFuture = server.bind("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
if (channelFuture != null) channelFuture.channel().close().awaitUninterruptibly();
}
}
public static void sendError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus status) {
ByteBuf content = Unpooled.copiedBuffer(errorMessage, StandardCharsets.UTF_8);
FullHttpResponse response =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
status,
content);
response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
response.headers().set(HttpHeaders.Names.ACCEPT_CHARSET, StandardCharsets.UTF_8.name());
ChannelFuture channelFuture = ctx.writeAndFlush(response);
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
}
В Netty существуют предустановленные handler’ы. В pipeline первые три: HttpResponseEncoder, HttpRequestDecoder, HttpObjectAggregator
нужны лишь для того, чтобы сразу работать с объектом DefaultFullHttpRequest
, иначе пришлось бы в ручную из набора байтов собирать запрос. CorsHandler
служит для управления доступом к ресурсу (Подробнее, можно почитать здесь). Следом идут, созданные нами handler’ы.
Замечания:
- Стоит обратить на некоторые нюансы. После записи ответа, обязательно нужно вызвать метод
flush()
и закрыть созданный при этомChannelFuture
. Если этого не сделать сервер будет держать соединение, пока оно не отвалится по timeout. - Важная деталь, в Netty используется механизм подсчета ссылок. Программист должен сам увеличивать количество ссылок вызовом метода
retain()
если объект будет использоваться где-то еще, илиrelease()
если действие над объектом закончены. Неправильное использование может привести к возникновению исключенияIllegalReferenceCountException
в лучшем случае, или же к утечкам памяти в худшем. (статья на официальном сайте) - Еще одна полезная функция в netty — ограничение скорости входящего и исходящего трафика. Можно избежать ситуаций saturated outbound, просто добавив в конце pipeline обработчик типа
ChannelTrafficShapingHandler
.