Недавно познакомился с фреймворком Netty. В традиционной модели на каждое соединение создается один поток, который его обрабатывает. Это приводит к росту нагрузки на CPU и объема потребляемой памяти. Netty использует другой подход, позволяющий обрабатывать одновременно количество соединений большее, чем количество запущенных потоков. Реализован он с использованием библиотеки java.nio. В данной статье, я опишу как Netty работает и расскажу как реализовать с его помощью простой REST сервис.

Для понимания принципа работы Netty с начало нужно обратиться к паттерну pipeline (Подробная статья с примером на .net). Если коротко, то данные проходят несколько этапов обработки. На каждом этапе они обрабатываются определенным обработчиком — handler’ом. Обработчик поочередно производит действия над данными, доставая их из своей входящей очереди. Как только handler закончил выполнение, полученный результат он отправляет следующему, у которого тоже есть очередь. Получается, что то вроде конвейера.

В Netty запросы обрабатываются похожим образом.

Теперь напишем REST сервис. У которого всего один метод sum — вычисляющий сумму двух чисел, переданных в качестве параметров GET /sum?first=<первое число>&second=<второе число>. Ответ будет приходить формата plain text в следующем виде : результат:<сумма двух чисел>.

Для начала добавим соответствующую зависимость. В этом примере используется версия 4.0.5.Final.

 <dependencies>
  ...
  <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
       <version>4.0.15.Final</version>
  </dependency>
</dependencies>

Теперь напишем handler’ы для обработки данных.

FilterHandler производит фильтрацию запросов, проверку параметров и отсеивание неверных. Его реализация:

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.*;
import java.util.List;
import java.util.regex.*;

public class FilterHandler extends MessageToMessageDecoder<DefaultFullHttpRequest> {
    private static final String PREFIX_URL = "/sum";
    private static final Pattern URL_PATTERN_FILTER = Pattern.compile(PREFIX_URL + "(?:\\?first=(\\d+)&second=(\\d+))");

    @Override
    protected void decode(ChannelHandlerContext ctx, DefaultFullHttpRequest request, List<Object> out) throws Exception {

        if (request.getMethod() != HttpMethod.GET) {
            HttpServer.sendError(ctx, "метод к данному ресурсу не применим", HttpResponseStatus.NOT_ACCEPTABLE);
            return;
        }

        String url = request.getUri();
        url = url == null ? "" : url.toLowerCase();
        if (!url.startsWith(PREFIX_URL)) {
            HttpServer.sendError(ctx, "ресурс не найден", HttpResponseStatus.NOT_FOUND);
            return;
        }

        Matcher matcher = URL_PATTERN_FILTER.matcher(url);
        if (!matcher.find())
            HttpServer.sendError(ctx, "некорретно указаны параметры", HttpResponseStatus.BAD_REQUEST);

        try {
            long firstNumber = Long.parseLong(matcher.group(1));
            long secondNumber = Long.parseLong(matcher.group(2));

            request.headers().add("firstNumber", firstNumber);
            request.headers().add("secondNumber", secondNumber);
            out.add(request);
            request.retain();

        } catch (NumberFormatException e) {
            HttpServer.sendError(ctx, "неверный формат параметров", HttpResponseStatus.BAD_REQUEST);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        HttpServer.sendError(ctx, "ошибка сервера:" + cause.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
    }
}

После получения запроса проверим тот ли тип метода, к тому ли ресурсу происходит обращение и с теми ли параметрами. Полученные значения параметров записываем в заголовок запроса. А сам запрос добавляем в очередь, для обработки следующим handler’ом.

Следом реализуем WorkerHandler. Он вычисляет сумму и отправлять результат.

import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;

import java.nio.charset.StandardCharsets;


public class WorkerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        HttpServer.sendError(ctx, "ошибка сервера:" + cause.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
        DefaultFullHttpRequest request = (DefaultFullHttpRequest) obj;

        int firstNumber = Integer.parseInt(request.headers().get("firstNumber"));
        int secondNumber = Integer.parseInt(request.headers().get("secondNumber"));

        long sum = firstNumber + secondNumber;

        ByteBuf content = Unpooled.copiedBuffer("результат:" + sum, StandardCharsets.UTF_8);
        DefaultFullHttpResponse response =
                new DefaultFullHttpResponse(
                        HttpVersion.HTTP_1_1,
                        HttpResponseStatus.OK,
                        content);

        response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
        response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
        response.headers().set(HttpHeaders.Names.ACCEPT_CHARSET, StandardCharsets.UTF_8.name());

        ChannelFuture channelFuture = ctx.writeAndFlush(response);
        channelFuture.addListener(ChannelFutureListener.CLOSE);

        request.release();
    }
}

Теперь осталось самое малое, создать сервер, указать параметры и добавить в его pipeline наши обработчики.

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.cors.*;
import java.nio.charset.StandardCharsets;

public class HttpServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        ChannelFuture channelFuture = null;
        try {
            ServerBootstrap server = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        private final CorsConfig corsConfig = CorsConfig
                                .anyOrigin()
                                .allowNullOrigin()
                                .allowCredentials().build();

                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new HttpResponseEncoder())
                                    .addLast(new HttpRequestDecoder())
                                    .addLast(new HttpObjectAggregator(Integer.MAX_VALUE))
                                    .addLast(new CorsHandler(corsConfig))
                                    .addLast(new FilterHandler())
                                    .addLast(new WorkerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 500)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            channelFuture = server.bind("localhost", 8080).sync();

            channelFuture.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            if (channelFuture != null) channelFuture.channel().close().awaitUninterruptibly();
        }
    }

    public static void sendError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus status) {
        ByteBuf content = Unpooled.copiedBuffer(errorMessage, StandardCharsets.UTF_8);
        FullHttpResponse response =
                new DefaultFullHttpResponse(
                        HttpVersion.HTTP_1_1,
                        status,
                        content);

        response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
        response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
        response.headers().set(HttpHeaders.Names.ACCEPT_CHARSET, StandardCharsets.UTF_8.name());

        ChannelFuture channelFuture = ctx.writeAndFlush(response);
        channelFuture.addListener(ChannelFutureListener.CLOSE);
    }
}

В Netty существуют предустановленные handler’ы. В pipeline первые три: HttpResponseEncoder, HttpRequestDecoder, HttpObjectAggregator нужны лишь для того, чтобы сразу работать с объектом DefaultFullHttpRequest, иначе пришлось бы в ручную из набора байтов собирать запрос. CorsHandler служит для управления доступом к ресурсу (Подробнее, можно почитать здесь). Следом идут, созданные нами handler’ы.

Замечания:

  • Стоит обратить на некоторые нюансы. После записи ответа, обязательно нужно вызвать метод flush() и закрыть созданный при этом ChannelFuture. Если этого не сделать сервер будет держать соединение, пока оно не отвалится по timeout.
  • Важная деталь, в Netty используется механизм подсчета ссылок. Программист должен сам увеличивать количество ссылок вызовом метода retain() если объект будет использоваться где-то еще, или release() если действие над объектом закончены. Неправильное использование может привести к возникновению исключения IllegalReferenceCountException в лучшем случае, или же к утечкам памяти в худшем. (статья на официальном сайте)
  • Еще одна полезная функция в netty — ограничение скорости входящего и исходящего трафика. Можно избежать ситуаций saturated outbound, просто добавив в конце pipeline обработчик типа ChannelTrafficShapingHandler.