Hadoop Rpc(一):protobuffer

概述

protobuffer在hadoop rpc中有2个作用:
1、用来序列化/反序列化消息
2、利用protobuffer生成的stub来抽象化调用过程

序列化与反序列化

编写.proto文件

option java_package = "rpc.proto";
option java_outer_classname = "MyResourceTrackerProtos";
option java_generate_equals_and_hash = true;

package rpc;

message MyResourceTrackerRequestProto {
  required string hostId = 1;
  required int32 cpu = 2;
  required int32 memory = 3;
}

message RpcHeaderProto {
  required string methodName = 1;
}

message MyResourceTrackerResponseProto {
  required bool flag = 1;
}

解释下option:

  • java_package:protoc编译生成的代码所在的包
  • java_outer_classname:protoc编译生成的代码所在的类
  • java_generate_equals_and_hash:对定义的message生成hash和equals方法

编译生成代码

由于hadoop使用的是proto2.5,这里使用2.5版本的protoc来编译proto文件



编译命令(注意路径,在proto文件所在目录下):

/usr/local/protoc2/bin/protoc --java_out=.. my_message.proto

生成的文件:


其中生成的类实现了com.google.protobuf.Message

序列化

  • 构建信息
MyResourceTrackerProtos.MyResourceTrackerRequestProto.Builder builder = MyResourceTrackerProtos.MyResourceTrackerRequestProto.newBuilder();
MyResourceTrackerProtos.MyResourceTrackerRequestProto req = builder.setCpu(1).setMemory(2).setHostId("localhost:5006").build();
  • 序列化方式一:
byte[] byteArray = req.toByteArray();

这种方式在rpc中用的比较少,因为消息最后是需要写入到输出流的。

  • 序列化方式二:
    使用输出流,没有往outputStream中写入消息的长度信息,如果有拆包的需要,则这个方式不合适,比如有2个protobuffer的消息要写入到一个outputStream。
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
req.writeTo(outputStream);
outputStream.flush();
  • 序列化方式三:
    这种用的比较多,会往outputStream中写入消息的长度信息,可以用来拆包,在网络传输中很有用。
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
req.writeDelimitedTo(outputStream);
outputStream.flush();

反序列化

  • 针对序列化方式三
byte[] byteArray1 = outputStream.toByteArray();
CodedInputStream codedInputStream = CodedInputStream.newInstance(byteArray1);
// 很重要:限制能从codedInputStream中读取的最大字节数。如果CodedInputStream中有多条消息,只会读第一条数据,不限定会读取所有数据,导致反序列化失败
codedInputStream.pushLimit(codedInputStream.readRawVarint32());
// 很重要:用来校验消息是否完整,每一个字段都对应一个tag,消息结尾的tag为0
codedInputStream.checkLastTagWas(0);
MyResourceTrackerProtos.MyResourceTrackerRequestProto deserilized = MyResourceTrackerProtos.MyResourceTrackerRequestProto.getDefaultInstance().getParserForType().parseFrom(codedInputStream);
System.out.println(deserilized);

结果:


利用protobuffer生成的stub来调用方法

定义proto文件

option java_package = "rpc.proto";
option java_outer_classname = "MyResourceTracker";
option java_generic_services = true;
option java_generate_equals_and_hash = true;

package rpc;

import "my_message.proto";

service MyResourceTrackerService {
  rpc registerNodeManager(MyResourceTrackerRequestProto) returns(MyResourceTrackerResponseProto);
}

编译生成代码

/usr/local/protoc2/bin/protoc --java_out=.. -I ./ MyResourceTracker.proto

会在rpc.proto. MyResourceTracker里生成一个抽象的静态内部类MyResourceTrackerService

使用MyResourceTrackerService来调用registerNodeManager方法

// MyResourceTrackerService有2个接口,一个是BlockingInterface,另一个是Interface,这里需要实现BlockingInterface,因为它的方法中带返回值
BlockingService blockingService = MyResourceTracker.MyResourceTrackerService.newReflectiveBlockingService(new MyResourceTracker.MyResourceTrackerService.BlockingInterface() {
            @Override
            public MyResourceTrackerProtos.MyResourceTrackerResponseProto registerNodeManager(RpcController controller, MyResourceTrackerProtos.MyResourceTrackerRequestProto request) throws ServiceException {
                return MyResourceTrackerProtos.MyResourceTrackerResponseProto.newBuilder().setFlag(true).build();
            }
        });
        Descriptors.MethodDescriptor methodByName = blockingService.getDescriptorForType().findMethodByName("registerNodeManager");
        // 获取方法的请求参数
        Message requestPrototype = blockingService.getRequestPrototype(methodByName);
// 反序列化
        Message message = requestPrototype.getParserForType().parseFrom(byteArray1);
// 调用registerNodeManager方法
        MyResourceTrackerProtos.MyResourceTrackerResponseProto resp = (MyResourceTrackerProtos.MyResourceTrackerResponseProto)blockingService.callBlockingMethod(methodByName, null, message);
        System.out.println(resp);

结果:


其它问题

问题一

如果遇到了Dependencies passed to FileDescriptor.buildFrom() don't match those listed in the FileDescriptorProto.这样的错误,需要在执行protoc命令时,执行位置须为proto文件所在目录

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容