【Netty】ChannelOutboundBuffer源码分析

上一篇文章对 RecvByteBufAllocator 的源码进行了分析,介绍了 Netty 是如何接收对端发送过来的数据。以及 Netty 是如何通过 AdaptiveRecvByteBufAllocator 来自适应调整 ByteBuf 的动态分配,解决 Java ByteBuffer 分配过大浪费内存,分配过小又需要频繁扩容的问题。

本篇文章会分析,Netty 是如何将数据发送出去的。

前置知识

Netty 支持的数据传输类型

首先你需要知道,通过 Netty 来发送数据,它只支持两种数据类型:ByteBuf 和 FileRegion。前者可以看作是 ByteBuffer,普通的字节数据传输。而后者是文件传输,Netty 通过 FileRegion 来实现文件传输的零拷贝。

write 和 flush

write()并不会发送数据,只是简单的将数据暂存到 ChannelOutboundBuffer。flush()才是真正的将数据通过 Socket 传输给对端。writeAndFlush()只是简单的执行以上两个方法而已。

Channel 高低水位线

当程序 write 了大量数据,或者虽然调用了 flush(),但是由于对端来不及接收数据,再或者由于网络原因等等情况,导致 TCP 缓冲区被写满,大量的消息积压在 ChannelOutboundBuffer,导致内存溢出。为了保护你的程序,Netty 给 Channel 设置了「高低水位线」,当积压的消息超过了高水位,Netty 会将 Channel 设为「不可写」状态并触发channelWritabilityChanged回调,你可以通过Channel.isWritable()判断是否要继续写数据。通过ChannelConfig.setWriteBufferHighWaterMark()ChannelConfig.setWriteBufferLowWaterMark()设置 Channel 的高低水位线。

订阅 OP_WRITE 事件

既然 write()操作是用户自己发起的,为啥还要订阅 Channel 的OP_WRITE事件呢?因为 TCP 缓冲区可能被写满,此时你就应该订阅OP_WRITE事件,暂时放弃写操作,等待Selector通知你 Channel 可写时,你再继续写。

ByteBuf 可以转换成 ByteBuffer

Java 原生的 SocketChannel 只支持写入 ByteBuffer,当你通过 Netty 写入 ByteBuf 时,它会将 ByteBuf 转换成 ByteBuffer 再写入,方法是ByteBuf.internalNioBuffer()

清楚 Java 对象在 JVM 中的内存布局

write(msg)时,会将 msg 包装成 Entry 节点加入到链尾,其中一个属性pendingSize记录着消息占用的内存空间,这个空间大小除了 msg 数据本身占用的空间外,还包含 Entry 对象占用的空间,因此默认会额外再加上 96。为啥是 96 后面会说明,首先你应该知道对象的对象头最大占用 16 字节,对象引用最少占用 4 字节,最多占用 8 字节,一个 long 类型占用 8 字节,int 类型占用 4 字节,boolean 类型占用 1 字节。另外 JVM 要求 Java 对象占用的空间必须是 8 字节的整数倍,因此还会有 padding 填充字节。

ChannelHandlerContext.writeAndFlush()分析

如下,分别是发送一个 ByteBuf 和 FileRegion 的简单示例:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 发送一个 hello
    ctx.writeAndFlush(Unpooled.wrappedBuffer("hello".getBytes()));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 传输一个 a.txt 文件
    RandomAccessFile accessFile = new RandomAccessFile("/disk/a.txt""r");
    DefaultFileRegion region = new DefaultFileRegion(accessFile.getChannel(), 0, accessFile.length());
 ctx.writeAndFlush(region);
}

我先说一下 writeAndFlush 的整体流程,实际的发送细节下一节会解释。

调用ctx.writeAndFlush(),会从当前 Handler 往前找能处理 write 事件的 Handler,如果调用的是ctx.channel().writeAndFlush(),则会从 Pipeline 的 TailContext 开始向前找能处理 write 事件的 Handler,事件传播的路径稍微有点区别。默认情况下,会找到 HeadContext 来处理,源码如下:

private void write(Object msg, boolean flush, ChannelPromise promise) {
    // 确保发送的消息不为空
    ObjectUtil.checkNotNull(msg, "msg");
    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }

    // 往后找能处理 write事件的Channel,默认会找到HeadContext。
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();

    // 如果是EventLoop线程,则直接执行,否则提交一个任务串行化执行。
    if (executor.inEventLoop()) {
        if (flush) {
            // 调用的是writeAndFlush(),所有flush为true,这里会调用HeadContext.invokeWriteAndFlush()
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}

找到 HeadContext 后,调用其invokeWriteAndFlush()方法,其实就是将 write 和 flush 放在一个方法里调用了:

void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        // 先通过handler调用write()
        invokeWrite0(msg, promise);
        // 再通过handler调用flush()
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

先看invokeWrite0(),它会调用HeadContext.write(),由于 write 操作需要和 JDK 的底层 API 交互,于是操作又会被转交给Channel.Unsafe执行:

 @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    // 需要和JDK底层API交互,转交给Unsafe执行。
    unsafe.write(msg, promise);
}

接下来会调用AbstractChannel.AbstractUnsafe.write()方法,它首先会对发送的数据做过滤,只支持 ByteBuf 和 FileRegion 两种类型。然后会计算发送的数据占用的内存大小,因为前面说过积压的消息一旦超过 Channel 的高水位线会将 Channel 设为「不可写」状态,防止内存溢出。这两步做完以后,会把消息添加到输出缓冲区 ChannelOutboundBuffer 中。

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {// outboundBuffer会随着Channel一同被创建,一般不会为null,这里做了校验。
        try {
            ReferenceCountUtil.release(msg);
        } finally {
            safeSetFailure(promise,
                    newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
        }
        return;
    }

    int size;
    try {
        // 过滤写出消息,确保是ByteBuf或FileRegion,其他对象不支持写出。
        msg = filterOutboundMessage(msg);
        /*
        估算消息占用的内存,作用:
            因为write()不会把消息写出到Socket,会暂存在内存里,直到flush()。
            Netty为了防止消息堆积,会设置高低水位,消息暂存的总量达到最高水位会将Channel设置不可写状态,
            以保护你的程序,避免内存溢出。
            详见:io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size()

            对于FileRegion,会直接返回0,因为使用了零拷贝技术,不需要把文件读取到JVM进程。
         */

        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        try {
            ReferenceCountUtil.release(msg);
        } finally {
            safeSetFailure(promise, t);
        }
        return;
    }

    // write()只会把消息暂存在outboundBuffer,不会真正发送。
    outboundBuffer.addMessage(msg, size, promise);
}

关注一下filterOutboundMessage(),它除了过滤消息,还会试图将 HeapByteBuf 转换成 DirectByteBuf。Netty 为了提升数据发送的效率,和 Socket 直接读写的数据会使用直接内存,避免 IO 操作再发生内存拷贝。

// 过滤出站消息,只支持写出ByteBuf和FileRegion。
@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        // 为了避免内存复制,Socket直接读写的数据都使用堆外内存
        return newDirectBuffer(buf);
    }

    // 文件传输
    if (msg instanceof FileRegion) {
        return msg;
    }
    // 不支持的数据类型,抛异常
    throw new UnsupportedOperationException(
        "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

newDirectBuffer()并不保证一定转换成功,如果使用的 ByteBufAllocator 是未池化的,且没有开启io.netty.threadLocalDirectBufferSize,那么就意味着 Netty 需要申请一个没有被池化的 DirectByteBuf,这个操作是非常昂贵的,Netty 会放弃转换:

// 试图将HeapByteBuf转换成DirectByteBuf,如果转换的开销很大会放弃。
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
    final int readableBytes = buf.readableBytes();
    if (readableBytes == 0) {
        // 可读字节数为0,直接释放并返回共享空对象。
        ReferenceCountUtil.safeRelease(buf);
        return Unpooled.EMPTY_BUFFER;
    }

    // 获取Channel绑定的ByteBufAllocator
    final ByteBufAllocator alloc = alloc();
    if (alloc.isDirectBufferPooled()) {// 分配器是否是池化的,且能分配直接内存?
        // 创建一个指定大小的直接内存ByteBuf,将数据写入,原buf释放
        ByteBuf directBuf = alloc.directBuffer(readableBytes);
        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
        ReferenceCountUtil.safeRelease(buf);
        return directBuf;
    }

    /*
    如果设置了io.netty.threadLocalDirectBufferSize,Netty会在线程的FastThreadLocal中通过Stack实现一个轻量级的
    ByteBuf对象池,ByteBuf写出到Socket后,会自动释放,这里会将它再push到线程绑定的Stack中进行重用。
     */

    final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
    if (directBuf != null) {
        directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
        ReferenceCountUtil.safeRelease(buf);
        return directBuf;
    }

    // 申请一个未池化的直接内存ByteBuf开销是很大的,测试过,比堆内存的申请慢10倍都不止,这里会直接放弃。
    return buf;
}

如果设置了io.netty.threadLocalDirectBufferSize,Netty 会为每个线程创建指定数量的 ByteBuf 对象缓存,这些 ByteBuf 是可以被重用的。实现逻辑是 Netty 会在 FastThreadLocal 存放一个 Stack,需要时pop()一个出来,用完时push()归还。

再来关注一下MessageSizeEstimator,它负责计算待发送数据占用的内存,逻辑很简单,对于 FileRegion 会返回 0,因为 FileRegion 传输文件时使用了零拷贝技术,直接使用 mmap 内存映射,而不需要将文件加载到 JVM 进程,实现直接看io.netty.channel.DefaultMessageSizeEstimator.HandleImpl.size()

// 估算消息的内存占用,逻辑还是很简单的。
@Override
public int size(Object msg) {
    if (msg instanceof ByteBuf) {
        return ((ByteBuf) msg).readableBytes();
    }
    if (msg instanceof ByteBufHolder) {
        return ((ByteBufHolder) msg).content().readableBytes();
    }
    // FileRegion实现了零拷贝,并不需要将文件加载到JVM,因此占用的内存为0,不影响Channel水位线。
    if (msg instanceof FileRegion) {
        return 0;
    }
    return unknownSize;
}

关于 ChannelOutboundBuffer 代码下节会详细分析,这里只需要知道 write()只会将数据暂存到 ChannelOutboundBuffer,而不会真正发送就行了。

消息存入 ChannelOutboundBuffer,write 操作就算完成了。紧接着会调用invokeFlush0(),它依然会转交给 Unsafe 执行,调用AbstractChannel.AbstractUnsafe.flush()。它会做两件事:先把 ChannelOutboundBuffer 中待发送的 Entry 标记为flushed,然后将要发送的 Entry 数据转换成 Java 的 ByteBuffer,使用 SocketChannel 进行真正的数据发送。

@Override
public final void flush() {
    assertEventLoop();

    // 得到SocketChannel绑定的ChannelOutboundBuffer
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    // 先将unflushed节点标记为flushed
    outboundBuffer.addFlush();
    // 开始发送数据
    flush0();
}

flush0()会开始发送数据,它首先会检测 Channel 是否活跃,如果是非活跃状态,此次 flush()操作将失败,Entry 会被移除。如果 Channel 正常,会调用doWrite()进行数据发送。

protected void flush0() {
    if (inFlush0) {// 避免上一次flush0()还没执行完时,又触发了
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {// 非空校验
        return;
    }

    inFlush0 = true;

    // 如果连接已经失活了。
    if (!isActive()) {
        try {
            if (!outboundBuffer.isEmpty()) {
                if (isOpen()) {
                    /*
                    通道是打开的,稍后可能会被激活。
                        1.释放msg
                        2.触发失败通知
                        3.回收Entry
                        4.递减消息挂起的字节数
                     */

                    outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                } else {
                    /*
                    道都被关闭了,和上面的处理流程类似,只是不用通过触发channelWritabilityChanged()回调了。
                     */

                    outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                }
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        // 连接正常,执行真正的write()操作
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        handleWriteError(t);
    } finally {
        inFlush0 = false;
    }
}

doWrite()是数据发送的核心,由子类实现,这里直接看NioSocketChannel.doWrite()。它会获取 Java 原生的 SocketChannel,将队列中待发送的 ByteBuf 转换成 ByteBuffer,然后循环发送数据。单次循环发送的数据量受以下两个条件限制:

  1. ByteBuffer 的数量限制。
  2. TCP 参数设置的缓冲区的大小限制(ChannelOption.SO_SNDBUF)。


如果 ChannelOutboundBuffer 积压了大量的数据,单次可能无法发送完,因此会默认循环发 16 次。循环次数过多可能会阻塞 IO 线程,导致其他 Channel 的事件得不到处理。

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    // 循环写的次数,默认16次。可能有大量消息积压在输出缓冲区,同时为了避免阻塞IO线程,做了次数限制。
    int writeSpinCount = config().getWriteSpinCount();
    do {
        if (in.isEmpty()) {// 没有数据要写出了
            // 取消监听 OP_WRITE事件
            clearOpWrite();
            return;
        }

        // 发送缓冲区的最大值,由 TCP参数: ChannelOption.SO_SNDBUF 设置。
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();

        /*
        将要flush的Entry转换成Java原生的ByteBuffer数组,因为做了总数和总字节数的限制,所以一次可能无法send所有数据。
        注意,这里只会处理ByteBuf,不会处理FileRegion。
         */

        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        // 其实就是 nioBuffers.length,上一步方法中会进行设置
        int nioBufferCnt = in.nioBufferCount();

        switch (nioBufferCnt) {
            case 0:
                // ByteBuf处理完了,但是可能还有FileRegion需要处理。
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                // 只有单个ByteBuf需要发送的情况
                ByteBuffer buffer = nioBuffers[0];
                // 尝试发送的字节数
                int attemptedBytes = buffer.remaining();
                // Java原生的SocketChannel.wrote(ByteBuffer)发送数据
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {// TCP缓冲区满,订阅OP_WRITE事件,等待可写时再继续处理
                    incompleteWrite(true);
                    return;
                }
                // 动态调整 发送缓冲区大小
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                // 删除已经发送的Entry节点
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                // 发送缓冲区有多个ByteBuf待发送
                // 尝试发送的字节总数
                long attemptedBytes = in.nioBufferSize();
                // 调用Java原生的SocketChannel.write()进行数据的发送,返回值是实际发送的字节数
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0) {
                    // 写入字节为0,可能是TCP缓冲区满了,订阅OP_WRITE事件,等待TCP可写时再执行。
                    incompleteWrite(true);
                    return;
                }
                // 根据本次实际写入的字节数,动态调整发送缓冲区:ChannelOption.SO_SNDBUF
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                /*
                删除已经发送的数据,是根据实际写入的字节数去删除的,而不是根据ByteBuf的数量。
                从flushedEntry开始,计算每个ByteBuf的大小,按个删除。
                可能存在某个ByteBuf发送部分数据的情况,会调整它的readerIndex。
                 */

                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);

    /*
    只有在 nioBufferCnt处理完了,调用doWrite0(in)处理FileRegion,且没处理完毕时,才可能走到这里。
    如果FileRegion没有处理完,writeSpinCount会小于0,这里会继续订阅OP_WRITE事件,等待Channel可写时继续处理。
     */

    incompleteWrite(writeSpinCount < 0);
}

此外,NioSocketChannel.doWrite()只会发送 ByteBuf,FileRegion 的发送需要调用父类的AbstractNioByteChannel.doWrite0()处理。

/*
NioSocketChannel只负责发送ByteBuf,
FileRegion的发送这边会处理。
 */

protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
    Object msg = in.current();
    if (msg == null) {
        // Directly return here so incompleteWrite(...) is not called.
        return 0;
    }
    // 数据发送
    return doWriteInternal(in, in.current());
}

private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (!buf.isReadable()) {
            // 没有数据可读,直接删除节点
            in.remove();
            return 0;
        }

        // Java底层write()
        final int localFlushedAmount = doWriteBytes(buf);
        if (localFlushedAmount > 0) {
            in.progress(localFlushedAmount);
            if (!buf.isReadable()) {
                in.remove();
            }
            return 1;
        }
    } else if (msg instanceof FileRegion) {
        FileRegion region = (FileRegion) msg;
        // 已经传输的字节数 >= 字节总数,代表文件已经传输完毕,删除节点。
        if (region.transferred() >= region.count()) {
            in.remove();
            return 0;
        }
        // 调用region.transferTo(javaChannel(), position)文件传输
        long localFlushedAmount = doWriteFileRegion(region);
        if (localFlushedAmount > 0) {// 实际发送的字节数
            in.progress(localFlushedAmount);
            if (region.transferred() >= region.count()) {//FileRegion发送完毕,移除节点
                in.remove();
            }
            return 1;
        }
    } else {
        // Should not reach here.
        throw new Error();
    }
    /*
    一般不会走到这里,可能是数据没有处理完毕,返回一个Integer.MAX_VALUE,
    让writeSpinCount小于0,这样它就会订阅OP_WRITE事件,等待Channel可写时继续处理。
     */

    return WRITE_STATUS_SNDBUF_FULL;
}

需要注意的是,flush()操作可能存在两种情况:

  1. 数据正常发送完毕。
  2. 数据没有发完,就已经超过了最大循环次数,为了不阻塞 IO 线程,下次再处理。
  3. TCP 缓冲区满,数据无法发送。

对于后面两种情况,都属于「不完整写入」,因此会调用incompleteWrite(setOpWrite)稍后继续处理。针对第三种情况,Netty 需要订阅OP_WRITE事件,等待Selector通知 Channel 可写时继续发送数据。setOpWrite参数代表是否要监听OP_WRITE事件:

/**
 * 不完整写入
 * @param setOpWrite 是否要订阅OP_WRITE事件
 */

protected final void incompleteWrite(boolean setOpWrite) {
    // setOpWrite为true,一般都是TCP缓冲区满了,此时需要订阅OP_WRITE事件,等待Channel可写时再继续处理。
    if (setOpWrite) {
        // 订阅OP_WRITE事件
        setOpWrite();
    } else {
        // 取消订阅OP_WRITE事件
        clearOpWrite();

        // 提交一个flush任务,稍后执行,避免阻塞IO线程。
        eventLoop().execute(flushTask);
    }
}

至此,writeAndFlush()的整个流程就处理完了,对于 ChannelOutboundBuffer 本节没有进行分析,看下节。

ChannelOutboundBuffer 源码分析

ChannelOutboundBuffer 是 Netty 的数据发送缓冲区,它跟随 SocketChannel 一同被创建。

先看属性:

/*
将ByteBuf包装成Entry时,额外占用的字节大小,因为除了ByteBuf本身的数据外,Entry对象也占用空间。
为啥是96?为啥还支持修改??
    1.96是Netty根据64位的JVM计算的最大值。
    2.如果你的程序运行在32位的JVM中,或者对象引用开启了压缩,你可以根据实际情况修改这个值。

分析为啥最多会占用96字节:
    在64位的JVM中,一个Entry对象占用以下空间:
      - 16字节的对象头空间:8字节Mark Word,最大8字节Klass Pointer
      - 6个对象引用属性,最小4*6=24字节,最大8*6=48字节
      - 2个long属性,2*8=16字节
      - 2个int属性,2*4=8字节
      - 1个boolean属性,1字节
      - padding对齐填充,JVM要求对象占用的内存为8字节的整数倍,这里是7字节
     合计最多为96字节.
 */

static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
        SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead"96);

/*
发送数据时,需要将待发送的ByteBuf转换成ByteBuffer,考虑到write是个很频繁的操作,
为了避免频繁创建数组,这里进行了复用,每个线程会复用自己的ByteBuffer[]。
 */

private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
    @Override
    protected ByteBuffer[] initialValue() throws Exception {
        // 默认大小为1024,后面有必要还会扩容
        return new ByteBuffer[1024];
    }
};

// 绑定的SocketChannel
private final Channel channel;

// 已经flush,等待发送的头节点。
private Entry flushedEntry;

// 已经write但是没flush的头节点,flush()时会通过它一直往后找
private Entry unflushedEntry;

// 链尾节点
private Entry tailEntry;

// flush的节点数量,发送数据时会从flushedEntry开始往后找flushed个节点。
private int flushed;
// 单次循环写的Nio Buffer数量
private int nioBufferCount;
// 单次循环写的Nio Buffer总大小
private long nioBufferSize;
// flush是否失败
private boolean inFail;

// 计算totalPendingSize属性的偏移量,通过CAS的方式来做修改。
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
        AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");

// 输出缓冲区暂存的消息占用的总内存,通过该值判断是否达到高低水位,以修改Channel的可写状态。
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;

// unwritable属性的偏移量,通过CAS的方式来修改。
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");

// Channel是否可写的状态,0可写,1不可写。输出缓冲区内存达到高低水位线时修改。
@SuppressWarnings("UnusedDeclaration")
private volatile int unwritable;

// Channel可写状态变更时触发的任务,消息的积压达到高低水位线时触发
private volatile Runnable fireChannelWritabilityChangedTask;

它本身是一个单向链表,由一系列 Entry 节点组成。它有三个节点指针:

  • flushedEntry:已经 flush,等待被发送的起始节点指针。
  • unflushedEntry:已经 write,等待 flush 的起始节点指针。
  • tailEntry:链尾指针。

笔者花了一个简图,来表示它是如何工作的:【Netty】ChannelOutboundBuffer源码分析上节说过,执行flush(msg)操作时,只是把数据暂存到 ChannelOutboundBuffer,核心方法是addMessage(),它主要做了两件事:

  1. 将 msg 封装成 Entry 节点,加入到链尾。
  2. 统计输出缓冲区的消息总字节数是否达到高水位线,如果达到则将 Channel 设为「不可写」状态,且触发ChannelWritabilityChanged回调。
/**
 * 将消息暂存到ChannelOutboundBuffer,暂存成功promise就会收到通知。
 * @param msg 待发送的数据:ByteBuf/FileRegion
 * @param size 数据占用的内存大小
 * @param promise write成功会收到通知
 */

public void addMessage(Object msg, int size, ChannelPromise promise) {
    // 将msg包装成一个Entry,并加入到链尾。
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        // tail不为空,则添加到它的next
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;// tailEntry指向新添加的节点
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // 统计消息挂起的字节数,如果超过高水位线,需要修改Channel的可写状态并触发回调
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

先看Entry.newInstance(),它会将 msg 封装成 Entry 节点,加入到链尾。Entry 有个属性pendingSize用来记录消息占用的内存空间,需要注意的是,它除了 msg 本身的数据空间,还会加上 Entry 对象占用的空间,一个 Java 对象占用多少空间是在编译期就确定下来的,除了属性占用的空间外,读者还需要了解 Java 对象的内存布局。

/**
 * 创建一个Entry节点,从对象池中取一个
 * @param msg 消息本身
 * @param size 由MessageSizeEstimator估算出消息占用的内存大小
 * @param total 消息本身的大小(区别是FileRegion的处理)
 * @param promise write()完成会收到通知
 * @return
 */

static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
    /*
    每次write()都需要一个Entry,考虑到write()是一个非常频繁的操作,
    为了避免Entry的频繁创建和销毁,这里做了对象池的重用处理。
     */

    Entry entry = RECYCLER.get();
    entry.msg = msg;
    /*
    占用的内存为什么还要加上CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD???
        除了ByteBuf占用的空间外,Entry本身也占用空间啊。
         在64位的JVM中:
          - 16字节的对象头空间:8字节Mark Word,最大8字节Klass Pointer
          - 6个对象引用属性,最小4*6=24字节,最大8*6=48字节
          - 2个long属性,2*8=16字节
          - 2个int属性,2*4=8字节
          - 1个boolean属性,1字节
          - padding对齐填充,JVM要求对象占用的内存为8字节的整数倍,这里是7字节
         合计最多为96字节,因此CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD的默认值即使96。
     */

    entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
    entry.total = total;
    entry.promise = promise;
    return entry;
}

Entry 加入链表后,incrementPendingOutboundBytes()会累加字节总数,判断是否超过高水位线:

/**
 * 统计消息挂起的字节数,如果超过高水位线,需要修改Channel的可写状态并触发回调
 * @param size 消息占用内存的字节数
 * @param invokeLater 是否稍后触发再回调
 */

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    // 统计消息暂存的内存大小,CAS的方式进行累加
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    // 暂存消息达到高水位线,消息积压了
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        // 修改可写状态,触发回调
        setUnwritable(invokeLater);
    }
}

setUnwritable()会在数据总字节数超过高水位线时触发,它会通过自旋+CAS 的方式将unwritable从 0 改为 1,然后触发回调:

// 将Channel设为不可写,CAS执行
private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0) {
                // CAS操作成功,触发回调
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

ChannelOutboundBuffer 为 write 操作所做的工作就这么多了,下面看 flush。

执行 flush 操作时,首先会调用outboundBuffer.addFlush()unflushed节点标记为flushed,其实就是移动flushedEntryunflushedEntry指针,这个过程会检查 Entry 节点是否被取消,如果取消了会跳过节点,同时会递减该 Entry 占用的内存空间。

// 只是将节点标记为flushed,并没有真正发送数据,会跳过已经被取消的节点。
public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            // 将entry的promise设为 不可取消 状态
            if (!entry.promise.setUncancellable()) {
                // 设置失败,说明promise已经取消,需要释放消息,并递减挂起的字节数
                int pending = entry.cancel();
                // 递减缓冲区的消息字节总数,如果达到低水位线,则将Channel重新设为「可写」状态,并触发回调
                decrementPendingOutboundBytes(pending, falsetrue);
            }
            entry = entry.next;
        } while (entry != null);// 不断往后找 待flush的节点

        // 所有的节点都flush了,置为空
        unflushedEntry = null;
    }
}

节点状态标记完成后,会调用doWrite()开始写数据。首先它需要 ChannelOutboundBuffer 将flushed节点转换成 Java 原生的 ByteBuffer,方法是nioBuffers()。因为 OS 对SocketChannel.write()单次发送的字节数有限制,一般是Integer.MAX_VALUE,所以单次转换需要提供两个参数:

  • maxCount:转换 ByteBuffer 的最大数量,默认是 1024。
  • maxBytes:最大字节数,默认是设置的 TCP 发送缓冲区大小(ChannelOption.SO_SNDBUF)。
/**
 * 将要flush的Entry转换成Java原生的ByteBuffer数组,因为做了总数和总字节数的限制,所以一次可能无法send所有数据。
 * @param maxCount 单次发送的ByteBuffer最大数量
 * @param maxBytes 发送缓冲区的最大值,由 TCP参数: ChannelOption.SO_SNDBUF 设置。
 * @return
 */

public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
    assert maxCount > 0;
    assert maxBytes > 0;
    long nioBufferSize = 0;
    int nioBufferCount = 0;
    // 由于write操作很频繁,避免ByteBuffer[]频繁创建和销毁,这里进行了复用,每个线程都有一个ByteBuffer[1024]
    final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);

    Entry entry = flushedEntry;
    // 确保Entry节点是flushed,且msg是ByteBuf类型
    while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
        // 确保节点没有被取消,如果取消了,则跳过它。
        if (!entry.cancelled) {
            ByteBuf buf = (ByteBuf) entry.msg;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;

            // 可读字节数就是要写出的字节数,确保大于0
            if (readableBytes > 0) {
                if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
                    // 发送的数据超过了maxBytes,退出循环
                    break;
                }
                nioBufferSize += readableBytes;
                int count = entry.count;
                if (count == -1) {
                    // -1代表没有设置ByteBuf的nioBufferCount,ByteBuf中ByteBuffer的数量
                    entry.count = count = buf.nioBufferCount();
                }
                // 是否需要更多的空间
                int neededSpace = min(maxCount, nioBufferCount + count);
                // 如果ByteBuffer的数量超过了默认值1024,就去申请更多的空间
                if (neededSpace > nioBuffers.length) {
                    // 成倍扩容,直到数组长度足够。并塞回到FastThreadLocal。
                    nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                    NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                }
                if (count == 1) {
                    ByteBuffer nioBuf = entry.buf;
                    if (nioBuf == null) {
                        // 将ByteBuf转ByteBuffer,且缓存到Entry中
                        entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                    }
                    // 设置ByteBuffer
                    nioBuffers[nioBufferCount++] = nioBuf;
                } else {
                    // 一个ByteBuf包含多个ByteBuffer的情况处理,循环遍历设置
                    nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
                }
                if (nioBufferCount >= maxCount) {
                    // ByteBuffer的数量超过了maxCount,退出循环
                    break;
                }
            }
        }
        entry = entry.next;
    }
    this.nioBufferCount = nioBufferCount;
    this.nioBufferSize = nioBufferSize;

    return nioBuffers;
}

这里关注一下NIO_BUFFERS属性,它是一个 FastThreadLocal,每个线程都有自己的 ByteBuffer[]缓存,默认长度 1024,可以被复用。这里为什么要复用呢?因为作为一个网络 IO 框架,flush 肯定是一个非常频繁的操作,为了避免每次都创建 ByteBuffer[],复用可以提升系统性能,减轻 GC 的压力。

如果一个 ByteBuf 由很多个 ByteBuffer 组成,默认的 1024 个 ByteBuffer 可能不够用,此时会调用expandNioBufferArray()进行扩容:

// 对array进行扩容
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
    int newCapacity = array.length;
    do {
        // 成倍扩容
        newCapacity <<= 1;

        if (newCapacity < 0) {// int溢出
            throw new IllegalStateException();
        }

    } while (neededSpace > newCapacity);

    ByteBuffer[] newArray = new ByteBuffer[newCapacity];
    // 元素迁移
    System.arraycopy(array, 0, newArray, 0, size);

    return newArray;
}

将待发送的 ByteBuf 转换成 ByteBuffer 后,NioSocketChannel 会调用 JDK 底层的 SocketChannel.write()进行真正的数据发送。

数据发送完毕后,需要移除 ChannelOutboundBuffer 中的节点。节点的添加是从链尾开始,移除则是从链头开始的。ChannelOutboundBuffer 是根据实际发送的字节数来移除节点的,因此会存在某个 ByteBuf 只发送了部分数据的情况,如果某个 ByteBuf 数据没有发送完,那么该节点并不会被移除,只会调整它的readerIndex索引,下次继续发送剩余数据。

/**
 * 根据写入到TCP缓冲区的字节数来移除ByteBuf。
 * @param writtenBytes
 */

public void removeBytes(long writtenBytes) {
    for (;;) {
        // 从flushedEntry开始计算
        Object msg = current();
        if (!(msg instanceof ByteBuf)) {
            assert writtenBytes == 0;
            break;
        }

        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;
        // 如果单个ByteBuf的数据 <= writtenBytes,则直接移除Entry节点
        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            remove();
        } else { // readableBytes > writtenBytes
            // 存在某个ByteBuf发送部分数据的情况,调整它的readerIndex,下次继续发送。
            if (writtenBytes != 0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
    // 重置 NIO_BUFFERS
    clearNioBuffers();
}


至此,Netty 的数据发送核心流程全部分析结束。

总结

为了避免每次 write 都将数据写入 TCP 缓冲区,Netty 的 Channel 提供了两种操作:write 和 flush,这需要依赖一个核心类 ChannelOutboundBuffer。write 只是将数据暂存到缓冲区,flush 才是发送数据。同时为了避免消息积压的太多导致 OOM,Netty 提供了高低水位线,当暂存的消息到达高水位时,Netty 会将 Channel 设为「不可写」,同时触发回调,用户可以根据该状态判断是否需要继续写入消息。

ChannelOutboundBuffer 本身是个单向链表,负责管理暂存的消息,当需要发送数据时,它还会负责将 ByteBuf 转换成 ByteBuffer,因为 JDK 底层的 SocketChannel 只支持写入 ByteBuffer。

数据发送完毕后,ChannelOutboundBuffer 还要负责根据实际发送的字节数来移除 Entry 节点,因为存在某个 ByteBuf 只发送了部分数据的情况,针对这种特殊情况,ChannelOutboundBuffer 不会将节点移除,而是调整它的readerIndex索引,下次继续发送剩余数据。


原文始发于微信公众号(程序员小潘):【Netty】ChannelOutboundBuffer源码分析

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/29380.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!