ServerBootstrapAcceptor:接收连接的核心

ServerBootstrapAcceptor 是 Netty 服务端用来接收客户端连接的核心类,之前的文章在分析 Netty 服务端启动全流程的时候有提到过一嘴,今天这篇文章会详细分析一下。

1. 何时被添加到 Pipeline?

服务端启动时,会调用ServerBootstrap.bind()绑定本地端口用来监听客户端的连接,这个方法会通过反射创建 ServerSocketChannel 并初始化,ServerBootstrap.init()会初始化 ServerSocketChannel,将 ServerBootstrapAcceptor 添加到服务端 Channel 的 Pipeline 中,源码如下:

// 服务端Channel初始化
@Override
void init(Channel channel) {// 这里的channel是ServerSocketChannel
 // 设置options
 setChannelOptions(channel, newOptionsArray(), logger);
 // 设置attrs
 setAttributes(channel, newAttributesArray());


 // 初始化ServerSocketChannel的ChannelPipeline
 ChannelPipeline p = channel.pipeline();

 final EventLoopGroup currentChildGroup = childGroup;
 final ChannelHandler currentChildHandler = childHandler;
 // 和ServerSocketChannel建立连接的客户端SocketChannel需要设置的options和attrs
 final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
 final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

 /*
 往服务端Channel添加Handler:
  1.封装HandlerAdded回调任务,保存在PendingHandlerCallback
  2.后续的register()操作会触发回调:pipeline.invokeHandlerAddedIfNeeded();
  */

 p.addLast(new ChannelInitializer<Channel>() {
  /*
  initChannel()何时被调用?
   ChannelHandler被添加到Pipeline有一个对应的回调:handlerAdded()
   addLast()会提交一个任务,让EventLoop来触发这个回调
   ChannelInitializer在handlerAdded()回调里会执行该初始化方法。
   */

  @Override
  public void initChannel(final Channel ch) {
   final ChannelPipeline pipeline = ch.pipeline();
   ChannelHandler handler = config.handler();//ServerBootstrap.handler()设置的
   if (handler != null) {
    pipeline.addLast(handler);
   }

   ch.eventLoop().execute(new Runnable() {
    @Override
    public void run() {
     // ServerBootstrapAcceptor是服务端接收客户端连接的核心
     pipeline.addLast(new ServerBootstrapAcceptor(
       ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    }
   });
  }
 });
}

初始化时,会往 ServerSocketChannel 的 Pipeline 中添加了一个 ChannelInitializer,这里有必要说一下 ChannelInitializer。

ChannelInitializer 是一个特殊的 ChannelHandler,它本身不处理任何出站/入站事件,它的目的仅仅是完成 Channel 的初始化。我们知道,ChannelHandler 被添加到 Pipeline 后会触发一个handlerAdded回调,ChannelInitializer 在这个回调里会调用initChannel()进行初始化,初始化完成后会将自己从 Pipeline 中删除,源码如下:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        // 初始化Channel
        if (initChannel(ctx)) {
            // 将自己从Pipeline中移除
            removeState(ctx);
        }
    }
}

回到 ServerBootstrapAcceptor,它在初始化 ServerSocketChannel 时会向它的 Pipeline 中添加ServerBootstrap.handler()用户设置的自定义 ChannelHandler,当然你也可以不设置,它会判空校验。添加完用户自定义的 ChannelHandler 后,它会再追加一个 ServerBootstrapAcceptor,这样 ServerBootstrapAcceptor 就可以处理入站事件了,即处理客户端的连接。

2. 源码分析

ServerBootstrapAcceptor 的源码很简单,只有不到小一百行,它的职责也很简单,只负责处理客户端新的连接建立,并把连接注册到 WorkerGroup 中,仅此而已。

通过查看类结构,发现它只实现了 ChannelInboundHandler 接口,这代表它对入站事件感兴趣。本来嘛,它负责新连接的建立,肯定只对OP_ACCEPT事件感兴趣了。ServerBootstrapAcceptor:接收连接的核心

再看下它的属性,也比较简单,因为它负责客户端 Channel 的建立和初始化,因此需要 childChannel 相关的配置信息。

private final EventLoopGroup childGroup;// Reactor模型中的WorkerGroup
private final ChannelHandler childHandler;// 客户端Channel的ChannelHandler
private final Entry<ChannelOption<?>, Object>[] childOptions;// 客户端Channel的Options
private final Entry<AttributeKey<?>, Object>[] childAttrs;// 客户端Channel的Attrs
private final Runnable enableAutoReadTask; // 启用自动读取的任务

构造函数就是基本的赋值操作,这里就不贴代码了。

它只重写了channelReadexceptionCaught方法,代表它只对这两个事件感兴趣。当有 Selector 有OP_ACCEPT事件到达时,NioEventLoop 会接收客户端连接,创建 SocketChannel,并触发channelRead回调,看下它是如何处理新的连接的。

/**
 * 有客户端连接时,触发.
 * NioEventLoop会监听Selector事件,OP_ACCEPT事件到达时,触发Unsafe.read()。
 * {@link AbstractNioMessageChannel.NioMessageUnsafe#read()}
 * 它会调用ServerSocketChannel.accept()获取客户端连接,并触发channelRead()回调。
 */

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
 final Channel child = (Channel) msg;// 这里的Channel是SocketChannel

 // 设置客户端Channel的Pipeline、Options、Attrs
 child.pipeline().addLast(childHandler);
 setChannelOptions(child, childOptions, logger);
 setAttributes(child, childAttrs);

 try {
  /*
  将客户端Channel注册到WorkerGroup:
   1.next()轮询出一个EventLoop.register()
   2.Channel.Unsafe.register(),Channel注册到Selector
   3.触发各种回调
  Channel一旦注册到EventLoop,就由该EventLoop负责处理它整个生命周期的所有事件。
   */

  childGroup.register(child).addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture future) throws Exception {
    // 如果注册失败,强制关闭连接
    if (!future.isSuccess()) {
     // 底层就是调用原生JDK的关闭方法:javaChannel().close();
     forceClose(child, future.cause());
    }
   }
  });
 } catch (Throwable t) {
  forceClose(child, t);
 }
}

大致步骤如下:

  1. 设置 SocketChannel 的 Pipeline。
  2. 设置 Options。
  3. 设置 Attrs。
  4. 将 SocketChannel 注册到 WorkerGroup 中。

Netty 采用的 Reactor 主从线程模型,BossGroup 负责连接的建立,WorkerGroup 负责后续连接的读写。所以 ServerBootstrapAcceptor 在客户端 Channel 连接建立后会将它注册到 WorkerGroup 中。

当整个过程出现异常时,Netty 会强制关闭连接,调用forceClose(),底层还是调用了 JDK 底层的SocketChannel.close()方法关闭连接。

// 强制关闭连接
private static void forceClose(Channel child, Throwable t) {
    /**
    * 底层还是调用了SocketChannel.close()
    * {@link io.netty.channel.socket.nio.NioSocketChannel#doClose()}
    */

    child.unsafe().closeForcibly();
    logger.warn("Failed to register an accepted channel: {}", child, t);
}

ChannelRead 事件异常了,Pipeline 还会传播异常事件,执行exceptionCaught回调。ServerBootstrapAcceptor 面对异常时,会暂停 1 秒停止接受客户端连接,等待 ServerSocketChannel 恢复,并将异常事件传播出去。

// 处理异常事件
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    final ChannelConfig config = ctx.channel().config();
    if (config.isAutoRead()) {
        // 1秒内停止接收新客户端
        config.setAutoRead(false);
        ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
    }
    // still let the exceptionCaught event flow through the pipeline to give the user
    // a chance to do something with it
    // 将异常事件传播出去
    ctx.fireExceptionCaught(cause);
}

至此,ServerBootstrapAcceptor 所有的源码都分析完了,超级简单。

3. 何时触发 channelRead()?

上面只对 ServerBootstrapAcceptor 的源码进行了分析,却没有说当有新的连接进来时,Netty 是何时调用的channelRead()方法的,这节会给你答案。

当调用ServerBootstrap.bind()方法时,Netty 会创建 ServerSocketChannel,并把它注册到 BossGroup 的 NioEventLoop 的 Selector 多路复用器,最后再绑定到本地端口。

这样 Netty 就可以接收客户端的连接了,当有新的连接接入时,Selector 会监听到并返回准备就绪的 Channel,NioEventLoop 会处理这些事件,详见NioEventLoop.processSelectedKey()方法。

由于事件类型是OP_ACCEPT,因此会调用Unsafe.read()处理,源码如下:

 // 数据可读、有新的连接接入
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    // 对于ServerSocketChannel只关心OP_ACCEPT事件
    unsafe.read();
}

这个 Unsafe 接口有两大实现,分别是服务端 Channel 的 Unsafe 和客户端 Channel 的 Unsafe。前者的 read 负责接收 SocketChannel 连接,后者的 read 负责读取对端发送的数据并封装成 ByteBuf。

对于服务端的Unsafe.read(),这里会执行io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read()方法,它会调用 JDK 底层的ServerSocketChannel.accept()接收到客户端的连接后,将其封装成 Netty 的 NioSocketChannel,再通过 Pipeline 将 ChannelRead 事件传播出去,这样 ServerBootstrapAcceptor 就可以在 ChannelRead 回调里处理新的客户端连接了。

/*
NioEventLoop.processSelectedKey() 当Channel有 OP_READ | OP_ACCEPT 事件时调用该方法。
对于服务端Channel来说,就是 OP_ACCEPT.
 */

@Override
public void read() {
 assert eventLoop().inEventLoop();
 final ChannelConfig config = config();
 final ChannelPipeline pipeline = pipeline();
 // 接收对端数据时,ByteBuf的分配策略,基于历史数据动态调整初始化大小,避免太大浪费空间,太小又会频繁扩容
 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
 allocHandle.reset(config);

 boolean closed = false;
 Throwable exception = null;
 try {
  try {
   do {
  /*
  对于ServerSocketChannel来说,就是接收一个客户端Channel,添加到readBuf
   */

    int localRead = doReadMessages(readBuf);
    if (localRead == 0) {
     break;
    }
    if (localRead < 0) {
     closed = true;
     break;
    }
    // 递增已读取的消息数量
    allocHandle.incMessagesRead(localRead);
   } while (continueReading(allocHandle));
  } catch (Throwable t) {
   exception = t;
  }

  int size = readBuf.size();
  for (int i = 0; i < size; i++) {
   readPending = false;
   /**
    * 通过pipeline传播ChannelRead事件
    * {@link io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object)}
    */

   pipeline.fireChannelRead(readBuf.get(i));
  }
  readBuf.clear();
  // 读取完毕的回调,有的Handle会根据本次读取的总字节数,自适应调整下次应该分配的缓冲区大小
  allocHandle.readComplete();
  // 通过pipeline传播ChannelReadComplete事件
  pipeline.fireChannelReadComplete();

  if (exception != null) {// 事件处理异常了
   // 是否需要关闭连接
   closed = closeOnReadError(exception);

   // 通过pipeline传播异常事件
   pipeline.fireExceptionCaught(exception);
  }

  if (closed) {//如果需要关闭,那就关闭
   inputShutdown = true;
   if (isOpen()) {
    close(voidPromise());
   }
  }
 } finally {
  if (!readPending && !config.isAutoRead()) {
   removeReadOp();
  }
 }
}

4. 总结

ServerBootstrapAcceptor 是一个特殊的 ChannelHandler,它是 Netty 服务端用来接收客户端连接的核心类。ServerBootstrap 在初始化 ServerSocketChannel 时,会往它的 Pipeline 中添加 ServerBootstrapAcceptor,ServerBootstrapAcceptor 重写了 ChannelRead 回调,当 NioEventLoop 检测到有OP_ACCEPT事件到达时会执行NioMessageUnsafe.read()方法,它会调用 JDK 底层的 API 接收客户端连接,并把它作为 msg 触发 ChannelRead 回调,这样 ServerBootstrapAcceptor 就可以拿到客户端连接,帮助它进行初始化并注册到 WorkerGroup 中。


原文始发于微信公众号(程序员小潘):ServerBootstrapAcceptor:接收连接的核心

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/29446.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之家——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!