Protobuf3 + Netty4: 在socket上传输多种类型的protobuf数据

勤奋不是嘴上说说而已,而是实际的行动,在勤奋的苦度中持之以恒,永不退却。业精于勤,荒于嬉;行成于思,毁于随。在人生的仕途上,我们毫不迟疑地选择勤奋,她是几乎于世界上一切成就的催产婆。只要我们拥着勤奋去思考,拥着勤奋的手去耕耘,用抱勤奋的心去对待工作,浪迹红尘而坚韧不拔,那么,我们的生命就会绽放火花,让人生的时光更加的闪亮而精彩。

导读:本篇文章讲解 Protobuf3 + Netty4: 在socket上传输多种类型的protobuf数据,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

Protobuf序列化的字节流数据是不能自描述的,当我们通过socket把数据发送到Client时,Client必须知道发送的是什么类型的数据,才能正确的反序列化它。这严重影响限制了C/S功能的实现,不解决的话信道事实上只能传输一种类型的数据。本文讲解一下我用的解决办法,虽然我觉得应该有官方的实现更合理,即原生支持Protobuf的自描述。

(在金融领域,有一个叫FAST的协议,基本原理和Protobuf相同,并且有更高的压缩率,并且序列化后的字节流是自描述的,可以自动反序列化为对应的模板的数据(模板相当于.proto文件),但是时间效率比protobuf差,大家也可以关注一下。)

解决方案一
首先,介绍另外一种实现,在protobuf官方wiki中描述的一种workaround,通过定义一种用于自描述的类型:

复制代码
message SelfDescribingMessage {
// Set of .proto files which define the type.
required FileDescriptorSet proto_files = 1;

// Name of the message type. Must be defined by one of the files in
// proto_files.
required string type_name = 2;

// The message data.
required bytes message_data = 3;
}
复制代码
(参考:https://developers.google.com/protocol-buffers/docs/techniques#self-description)

把实际要传输的类型的字节数组放在message_data字段中,用proto_files和type_name字段来描述它的proto文件和类型。这样,信道上传输的都是SelfDescribingMessage类型,但是其上的负载可以是任何类型的数据。

我没有试过这种方式。我不太愿意使用这种方式的原因是,很显然,这样做需要进行2次序列化和2次反序列化,byte数组也要被创建2次。如果对应时延和性能敏感的系统,这样做不够好。

解决方案二
今天主要要介绍的方案。在protobuf序列化的前面,加上一个自定义的头,这个头包含序列化的长度和它的类型。在解压的时候根据包头来反序列化。

假设socket上要传输2个类型的数据,股票行情信息和期权行情信息:

股票的.proto定义:

复制代码
syntax = “proto3”;

package test.model.protobuf;

option java_package = “test.model.protobuf”;

message StockTick {
string stockId = 1;
int price = 2;
}
复制代码

期权的.proto定义:

复制代码
syntax = “proto3”;

package test.model.protobuf;

option java_package = “test.model.protobuf”;

message OptionTick {
string optionId = 1;
string securityId = 2;
int price = 3;
}
复制代码

netty4官方事实上已经实现了protobuf的编解码的插件,但是只能用于传输单一类型的protobuf序列化。我这里截取一段netty代码,熟悉netty的同学马上就能理解它的作用:

复制代码
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(StockTickOuterClass.StockTick.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new CustomProtoServerHandler());
}
复制代码

看以上代码高亮部分,netty4官方的编解码器必须指定单一的protobuf类型才行。具体每个类的作用:

ProtobufEncoder:用于对Probuf类型序列化。
ProtobufVarint32LengthFieldPrepender:用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
ProtobufVarint32FrameDecoder:用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
ProtobufDecoder:反序列化指定的Probuf字节数组为protobuf类型。

我们可以参考以上官方的编解码代码,将实现我们客户化的protobuf编解码插件,但是要支持多种不同类型protobuf数据在一个socket上传输:

编码器CustomProtobufEncoder:
复制代码
import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**

  • 参考ProtobufVarint32LengthFieldPrepender 和 ProtobufEncoder
    */
    @Sharable
    public class CustomProtobufEncoder extends MessageToByteEncoder {

HangqingEncoder hangqingEncoder;

public CustomProtobufEncoder(HangqingEncoder hangqingEncoder)
{
    this.hangqingEncoder = hangqingEncoder;
}

@Override
protected void encode(
        ChannelHandlerContext ctx, MessageLite msg, ByteBuf out) throws Exception {
    

    byte[] body = msg.toByteArray();
    byte[] header = encodeHeader(msg, (short)body.length);
    
    out.writeBytes(header);
    out.writeBytes(body);
    
    return;
}

private byte[] encodeHeader(MessageLite msg, short bodyLength) {
    byte messageType = 0x0f;
    
    if (msg instanceof StockTickOuterClass.StockTick) {
        messageType = 0x00;
    } else if (msg instanceof OptionTickOuterClass.OptionTick) {
        messageType = 0x01;
    }
    
    byte[] header = new byte[4];
    header[0] = (byte) (bodyLength & 0xff);
    header[1] = (byte) ((bodyLength >> 8) & 0xff);
    header[2] = 0; // 保留字段
    header[3] = messageType;

    return header;

}

}
复制代码

CustomProtobufEncoder序列化传入的protobuf类型,并且为它创建了一个4个字节的包头,格式如下

body长度(low) body长度
(high) 保留字节 类型

其中的encodeHeader方法具体的实现要根据你要传输哪些protobuf类型来修改代码,也可以稍加设计避免使用太多的if…else。

解码器CustomProtobufDecoder:
复制代码
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import com.google.protobuf.MessageLite;

/**

  • 参考ProtobufVarint32FrameDecoder 和 ProtobufDecoder
    */

public class CustomProtobufDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    while (in.readableBytes() > 4) { // 如果可读长度小于包头长度,退出。
        in.markReaderIndex();

        // 获取包头中的body长度
        byte low = in.readByte();
        byte high = in.readByte();
        short s0 = (short) (low & 0xff);
        short s1 = (short) (high & 0xff);
        s1 <<= 8;
        short length = (short) (s0 | s1);

        // 获取包头中的protobuf类型
        in.readByte();
        byte dataType = in.readByte();

        // 如果可读长度小于body长度,恢复读指针,退出。
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }

        // 读取body
        ByteBuf bodyByteBuf = in.readBytes(length);

        byte[] array;
        int offset;

        int readableLen= bodyByteBuf.readableBytes();
        if (bodyByteBuf.hasArray()) {
            array = bodyByteBuf.array();
            offset = bodyByteBuf.arrayOffset() + bodyByteBuf.readerIndex();
        } else {
            array = new byte[readableLen];
            bodyByteBuf.getBytes(bodyByteBuf.readerIndex(), array, 0, readableLen);
            offset = 0;
        }
        
        //反序列化
        MessageLite result = decodeBody(dataType, array, offset, readableLen);
        out.add(result);
    }
}

public MessageLite decodeBody(byte dataType, byte[] array, int offset, int length) throws Exception {
    if (dataType == 0x00) {
        return StockTickOuterClass.StockTick.getDefaultInstance().
                getParserForType().parseFrom(array, offset, length);

    } else if (dataType == 0x01) {
        return OptionTickOuterClass.OptionTick.getDefaultInstance().
                getParserForType().parseFrom(array, offset, length);
    }

    return null; // or throw exception
}

}
复制代码

CustomProtobufDecoder实现了2个功能,1)通过包头中的长度信息来解决半包和粘包。 2)把消息body反序列化为对应的protobuf类型(根据包头中的类型信息)。

其中的decodeBody方法具体的实现要根据你要传输哪些protobuf类型来修改代码,也可以稍加设计避免使用太多的if…else。

在Netty服务器上应用编解码器
如何把我们自定义的编解码用于netty Server:

复制代码
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(“decoder”,new CustomProtobufDecoder());
pipeline.addLast(“encoder”,new CustomProtobufEncoder());
pipeline.addLast(new CustomProtoServerHandler());
}
复制代码

Binhua Liu原创文章,转载请注明原地址http://www.cnblogs.com/Binhua-Liu/p/5577622.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/140838.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!