Netty简单入门

笔者最近在看 Netty 相关的东西,想把过程中所学到的和感悟记录下来,于是决定单独开一个专栏,专门记录 Netty 相关的文章。

第一篇就从「简单入门」开始吧!!!

Netty 简介

Netty 是由 JBOSS 提供的一个 java 开源框架,现为 Github 上的独立项目。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

提取句子主干,首先,Netty 是一个网络应用程序框架,也可以理解为网络 IO 框架。利用 Netty,开发者可以快速开发出一个高性能、高可靠的网络服务器或客户端程序。

例如,你要开发一个 RPC 框架,生产者需要暴露服务,消费者需要调用服务。生产者和消费者之间如何通信呢?使用什么协议通信呢?双方通信的 IO 模型如何定义呢?通过 Netty 就可以快速实现。

Netty 的特点就是异步的、事件驱动的、高性能的,下面分别说下。

异步

在 Netty 中,所有的 IO 操作都是异步的,这意味着如:接收请求,Channel 数据的读写等操作都不会阻塞,Netty 会返回一个 ChannelFuture,它是一个异步操作的结果占位符。如果开发者就是想同步调用怎么办?通过调用ChannelFuture.sync()可以异步转同步,但是非常不建议这么做,它会阻塞当前线程,这和高性能是相悖的。

ChannelFuture 的接口定义:

public interface ChannelFuture extends Future<Void>

泛型是 Void,这意味着你并不能通过 ChannelFuture 获取到操作操作的结果,但是你可以通过addListener()来注册回调,Netty 会在异步操作完成时触发回调,这时你可以知道操作是否成功,以决定后续的操作。

Netty 官方推荐使用addListener()注册监听来获取结果,而不是调用await()await()会阻塞当前线程,这不仅浪费了当前线程资源,而且线程间的切换和数据同步需要较大的开销。另外需要特别注意的是:不要在 ChannelHandler 中调用await(),Channel 整个生命周期事件都由一个唯一绑定的 EventLoop 线程处理,执行 ChannelHandler 逻辑的也是 EventLoop,调用await()相当于线程本身在等待自己操作完成的一个结果,这会导致死锁。

相比之下,addListener()是完全非阻塞的,它会注册一个监听到 Channel,当异步操作完成时,EventLoop 会负责触发回调,性能是最优的。

事件驱动

Netty 程序是靠事件来驱动执行的。

Netty 使用不同的事件来通知我们,我们可以根据已经发生的事件来执行对应的动作逻辑。

Netty 是一个网络 IO 框架,所以事件可以按照出站和入站进行分类:

  • 入站
    1. 连接已激活/失活。
    2. 有数据可以读取。
    3. 用户自定义事件。
    4. 异常事件。
  • 出站
    1. 打开/关闭到远程节点的连接。
    2. 将数据 write/flush 到 Socket。

当 Channel 被注册到 EventLoop 后,该 EventLoop 会开启线程不断轮询,直到 Channel 有事件发生。有事件发生时,EventLoop 会触发相应的回调,通过 ChannelPipeline 进行事件的传播。Netty简单入门

高性能

Netty 开发服务端程序时,面对海量客户端连接,还必须保证高性能。

Netty 为了高性能做了很多努力和优化,这里简单列下,后面会详细说明,包括但不仅限于:

  1. 非阻塞的 Nio 编程,主从 Reactor 线程模型,只需少量线程即可应对海量连接。
  2. 基于引用计数算法的内存池化技术,避免 ByteBuf 的频繁创建和销毁。
  3. 更少的内存复制:
    • Socket 读写数据使用堆外内存,避免内存拷贝。
    • CompositeByteBuf 组合 ByteBuf,实现数据零拷贝。
    • 文件传输 FileRegion 避免内存拷贝。
  1. 局部无锁化,EventLoop 串行执行事件和任务,避免了线程竞争和数据同步。
  2. Netty 实现的 MpscQueue 高性能无锁队列。
  3. 反射替换 SelectorImpl 的 selectedKeys,将 HashSet 替换为数组,避免哈希冲突。
  4. FastThreadLocal 使用数组代替 Hash 表,带来更好的访问性能。
  5. … …想到再补充。

Netty 的组件

Channel

Channel 译为「通道」,它代表一个到实体(文件、硬件、Socket 等)的开放连接,例如针对网络有 SocketChannl,针对文件有 FileChannel 等。既然是通道,就代表它可以被打开,也可以被关闭,

Netty 没有使用 JDK 原生的 Channel,而是自己封装了一个,这样可以为客户端和服务端 Channel 提供一个统一的视图,使用起来更加方便。

Channel 分为两大类:

  1. 服务端 ServerSocketChannel,负责绑定本地端口,监听客户端的连接请求。
  2. 客户端 SocketChannel,负责和远程节点建立连接。

在网络编程模型中, 服务端和客户端进行 IO 数据交互的媒介就是 Channel,Channel 被打开的目的就是与对端进行数据交换,你可以通过 Channel 来给对端发送数据,和从对端读取数据。

常用的 Channel 实现如下:Netty简单入门

EventLoopGroup 和 EventLoop

EventLoopGroup 本身并不干活,它负责管理一组 EventLoop 的启动和停止,它提供一个next()方法从一组 EventLoop 线程中挑选出一个来执行任务。

EventLoopGroup 的next()方法依赖一个EventExecutorChooser选择器,通过选择器来从一组 EventLoop 中进行选择,Netty 默认的策略就是简单轮询,源码如下:

/*
创建一个选择器,从一组EventExecutor中挑选出一个。
Netty默认的选择策略就是:简单轮询。
*/

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    // 两种Chooser实现都有一个AtomicLong计数器,每次next()先自增再取余

    // 如果数量是2的幂次方数,则采用位运算
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        // 否则,对长度进行取余
        return new GenericEventExecutorChooser(executors);
    }
}

// 2的幂次方数的选择器,位运算
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        // 计数器自增 & 长度-1,和HashMap一样
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

// 普通的选择器,取余
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicLong idx = new AtomicLong();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

EventLoopGroup 管理的一组 EventLoop 应该趋向于处理同一类任务和事件,例如开发服务端程序,Netty 官方推荐的 Reactor 主从线程模型需要两个 EventLoopGroup:Boss 和 Worker,Boss 专门负责接收客户端的连接,连接建立后,Boss 会将客户端 Channel 注册到 Worker 中,由 Worker 来负责后续的数据读写事件。

EventLoopGroup 可以理解为是一个多线程的线程池,而 EventLoop 则是一个单线程的线程池,也是真正干活的角色。

EventLoop 不仅可以处理 Channel 的 IO 事件,还可以执行用户提交的系统任务,因为它本身就是个线程池。此外,它还实现了ScheduledExecutorService接口,因此它还可以执行定时任务。最常见的应用场景就是:你可以每隔一段时间检测一下客户端连接是否断开!

EventLoop 是如何工作的呢?

拿最常用的 NioEventLoop 来说,它内部会持有一个Selector多路复用器,初始化时,Selector也会被一同创建,然后当有任务被提交到 NioEventLoop 时,它会利用ThreadPerTaskExecutor创建一个线程执行run()方法。核心就在run()方法里,它会不断的轮询,检查Selector上是否有准备就绪的 Channel 需要处理,如果有则根据SelectionKey的事件类型触发相应的事件回调,并通过ChannelPipeline将事件传播出去。如果没有准备就绪的 Channel,则去检查taskQueue中是否有待处理的系统任务、或定时任务,如果有则执行,否则就阻塞在Selector.select()上,等待准备就绪的 Channel。

这里就简单过一下吧,后面会有源码解析的文章,敬请期待!

ChannelFuture

前面已经说过,Netty 是完全异步的 IO 框架,它所有的操作都会立即返回,不会阻塞在那里,这对于习惯了同步编程的同学可能要适应一下。你不能再调用一个方法,得到一个结果,根据结果判断再去执行后面的操作。因为此时异步操作可能还没有执行完,ChannelFuture 还没有结果。

ChannelFuture 只是一个异步操作的结果占位符,它代表未来可能会发生的一个结果,这个结果可能是执行成功,或是执行失败得到一个异常信息。

你可以通过调用await()阻塞等待这个操作完成,但是 Netty 不建议这么去做,这样会阻塞当前线程,浪费线程资源,而且线程间的切换和数据同步都是一个较大的开销。

Netty 推荐使用addListener()来注册一个回调,当操作执行完成/异常时,ChannelFuture 会向 EventLoop 提交任务来触发回调,你可以在回调方法里根据操作结果来执行后面的业务逻辑。回调和任务是由同一个线程驱动的,这样就避免了线程间数据同步的问题,性能是最好的。

ChannelPromise 和 ChannelFuture 的区别?ChannelPromise 是 ChannelFuture 的子类,是一个特殊的可写的 ChannelFuture。前面说过 ChannelFuture 代表未来操作的一个结果占位符,使用 ChannelFuture 你只能乖乖等待结果完成然后触发回调,这个结果是由 Netty 来设置,它没有提供可写操作。而 ChannelPromise 就不同了,它提供了手动设置结果的 API:setSuccess()setFailure(),结果只能设置一次,设置完后会自动触发回调。

入站事件处理器ChannelInboundHandler所有操作都不需要提供 ChannelPromise,因为这些回调是由 Netty 来主动触发的。而出站事件处理器ChannelOutboundHandler很多操作都需要提供一个 ChannelPromise,当出站数据处理完成时,你需要往 ChannelPromise 设置结果来通知回调。

ChannelHandler

ChannelHandler 是 Netty 的事件处理器,根据数据的流向,分为入站、出站两大类。

  • ChannelInboundHandler:入站事件处理器
  • ChannelOutboundHandler:出站事件处理器

当一个 ChannelHandler 被添加到 Channel 的 Pipeline 后,只要 EventLoop 轮询到 Channel 有事件发生时,就会根据事件类型触发相应的回调。例如:收到对端发送的数据,Channel 有数据可读时,会触发channelRead()方法。你要做的就是实现 ChannelHandler 类,重写channelRead()方法,Netty 会将读取到的数据包装成ByteBuf,至于拿到数据要做哪些事,那就是你的业务了。对于 Netty 开发者来说,你的主要工作,就是开发 ChannelHandler,实现 ChannelHandler 类,重写你感兴趣的事件即可。

常用的类有:ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter,分别处理入站和出站事件的,默认全部通过ctx.fireXXX()无脑向后传播,你只需要重写你感兴趣的事件,不用被迫重写所有方法了。

继承SimpleChannelInboundHandler类,你只需要重写channelRead0()方法,它会在该方法执行完毕后自动释放内存,防止内存泄漏,这一块后面会详细说明。

其他的就是 Netty 内置的一些开箱即用的编解码器,可以针对公有协议(如 HTTP)进行编解码,处理读/写半包的问题,SslHandler 针对读写数据进行加解密等等。

需要注意的点:

  1. Netty 使用池化技术来复用 ByteBuf 对象,使用完毕后切记及时释放资源。
  2. 如果你需要将事件传播下去,必须手动触发fireXXX()方法,Pipeline 可不会自动帮你传递。
  3. 数据读取需要注意:粘包/拆包 问题。
  4. 注意write()消息积压问题。

ChannelPipeline

Pipeline 译为「管道」,如果把 网络数据 比作水,那它就像 水管 一样,读入的数据从管道的头部流入,经过一系列 ChannelInboundHandler 处理,写出的数据从管道的尾部流入,经过一系列 ChannelOutboundHandler 处理,最终通过 Socket 发送给对端。

ChannelPipeline 是 ChannelHandler 的容器,默认实现DefaultChannelPipeline是一个双向链表,头节点始终是 HeadContext,尾节点始终是 TailContext(头尾节点有它们自己的职责所在),你可以往中间添加你自定义的 ChannelHandler。

HeadContext 头节点的职责:对于入站事件,它会无脑向后传播,确保你定义的 ChannelHandler 事件会被触发,对于出站事件,它会转交给 Channel.Unsafe 执行,例如bindwrite等,因为这些操作是偏底层的,需要和底层类打交道,Netty 不希望开发者去调用这些方法。

TailContext 尾节点的职责:对于出站事件,当然是无脑向后传递了,但是对于入站事件,如果前面的 Handler 没有释放读取的数据资源,TailContext 会自动释放,避免内存泄漏。对于异常,如果前面的 Handler 没有处理,TailContext 会打印日志记录下来,提醒开发者需要处理异常。

Bootstrap 和 ServerBootstrap

Netty 的引导类,Bootstrap 是客户端的引导类,ServerBootstrap 是服务端的引导类。

一个 Netty 服务的运行需要多个组件互相配合,使用 Bootstrap 可以快速组装这些组件,让它们协同运行。当然,你也可以脱离 Bootstrap,自己去引导服务,只是完全没有必要而已。

Bootstrap 的创建非常简单,默认的构造器不需要你传任何参数,这是因为它需要的参数很多,可能以后的版本还会发生改变,因此 Netty 使用建造者 Builder 模式来构建 Bootstrap。

Bootstrap 本身逻辑很简单,它只是负责组装组件,核心的逻辑都在各个组件里。下面是一个 ServerBootstrap 的标准启动代码示例,这篇文章先简单带过,后面会有详细的源码解析。

public class Server {
 public static void main(String[] args) {
  /*
  NioEventLoopGroup创建流程:
   1.创建ThreadPerTaskExecutor,利用FastThreadLocalThread来提升FastThreadLocal的性能
   2.初始化children,创建一组NioEventLoop(此时未创建线程)
   3.创建EventExecutorChooser,需要时从Group中轮询出一个EventLoop来执行任务
   */

  final NioEventLoopGroup boss = new NioEventLoopGroup(1);
  final NioEventLoopGroup worker = new NioEventLoopGroup();
  /*
  NioEventLoop流程:
   1.创建两个任务队列:taskQueue、tailTaskQueue
   2.openSelector()创建多路复用器Selector
   3.run()轮询Selector、taskQueue,串行处理IO事件和Task
  懒启动,只有在第一次execute()提交任务时才会利用executor创建线程
  对于Boss来说,线程启动是在调用bind()时,提交一个register()任务
  对于Worker,线程启动是在Boss接收到客户端连接时,提交一个register()任务
   */

  new ServerBootstrap()
    .group(boss, worker)
    .option(ChannelOption.SO_BACKLOG, 100)
    //.attr(null,null)
    .childOption(ChannelOption.SO_TIMEOUT, 1000)
    //.childAttr(null,null)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() 
{
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
       @Override
       public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.wrappedBuffer("hello".getBytes());
        ctx.writeAndFlush(buf);
       }
      });
     }
    }).bind(9999);
 }
}

第一个 Netty 程序

OK,前面介绍完 Netty 的组件,现在我们就基于这些组件,来写第一个 Netty 程序。

下面是一个 Echo 服务实例,如果有新的客户端接入,服务端会打印一句话,如果客户端向服务端发送数据,服务端会打印数据内容,并将数据原样写回给客户端,一个非常简单的程序。

EchoServer服务端标准实现:

public class EchoServer {
 // 绑定的端口
 private final int port;

 public EchoServer(int port) {
  this.port = port;
 }

 public static void main(String[] args) {
  // 启动Echo服务
  new EchoServer(9999).start();
 }

 public void start() {
  /*
  bossGroup负责客户端的接入
  workerGroup负责IO数据的读写
   */

  NioEventLoopGroup boss = new NioEventLoopGroup(1);
  NioEventLoopGroup worker = new NioEventLoopGroup();
  new ServerBootstrap()
    .group(boss, worker)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() 
{
     @Override
     protected void initChannel(SocketChannel sc) throws Exception {
      sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){

       @Override
       public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("有新的客户端连接...");
       }

       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        /*
        原样写回给客户端,因为OutBoundHandler还要使用,因此不能释放msg。
        底层数据写完后会自动释放。
         */

        byte[] bytes = ByteBufUtil.getBytes(((ByteBuf) msg));
        System.out.println("接受到数据:" + new String(bytes));
        ctx.writeAndFlush(msg);
       }

       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 出现异常了
        cause.printStackTrace();
        ctx.channel().close();
       }
      });
     }
    })
    .bind(port);
 }
}

EchoClient标准实现:

public class EchoClient {
 private final String host;//远程IP
 private final int port;//远程端口

 public EchoClient(String host, int port) {
  this.host = host;
  this.port = port;
 }

 public static void main(String[] args) {
  new EchoClient("127.0.0.1"9999).start();
 }

 public void start() {
  // 客户端只需要一个WorkerGroup
  NioEventLoopGroup worker = new NioEventLoopGroup();
  new Bootstrap()
    .group(worker)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() 
{
     @Override
     protected void initChannel(SocketChannel sc) throws Exception {
      sc.pipeline().addLast(new ChannelInboundHandlerAdapter() {
       @Override
       public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("连接建立,开始发送【hello】...");
        ctx.writeAndFlush(Unpooled.wrappedBuffer("hello".getBytes()));
       }

       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String data = ((ByteBuf) msg).toString(Charset.defaultCharset());
        System.out.println("收到服务端数据:" + data);
       }
      });
     }
    }).connect(host, port);//连接服务端
 }
}

如上,只需少量代码就可以快速开发出一个 Echo 服务,Netty 向开发者屏蔽了底层实现,你甚至都不需要知道Selector多路复用器,Channel 是何时注册到Selector上的?Netty 是如何处理 IO 事件的?网络数据是如何被读入的?又是如何被写出的?你只需要开发 ChannelHandler,写好回调逻辑,等待 Netty 调用即可。

如果使用 JDK 原生类网络编程,Bio 和 Nio 两种不同的模式代码风格差异非常大,如果需要在两者之间做切换,工作量非常巨大。而 Netty 就显得非常灵活,只需要将NioEventLoopGroupNioSocketChannel换成OioEventLoopGroupOioSocketChannel即可快速切换,这是 Netty 易用性和灵活性的极好体现。

总结

Netty 作为事件驱动的异步 IO 框架,在保证高性能的同时,还拥有非常好的灵活性和可扩展性。使用 Netty 你可以快速构建你的网络服务,或者开发一个框架,需要进行节点间的通信和数据传输,使用 Netty 来帮助你完成底层的通信是非常方便和高效的。例如阿里的 Dubbo、Facebook 的 Thrift 等 RPC 框架都使用 Netty 来完成底层的通信,你甚至可以使用 Netty 来定制一套你们公司内部的私有协议,非常酷!

入门就先写到这里吧,后面会有 Netty 服务端启动全流程的源码分析,一步一步看看 Netty 到底干了什么,后面针对 Netty 对高性能所做出的努力也会单独写篇文章,包括FastThreadLocal也会分析源码,看看 Netty 是如何提升ThreadLocal的性能的。敬请期待吧!!!


原文始发于微信公众号(程序员小潘):Netty简单入门

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/29370.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!