概述
protobuffer在hadoop rpc中有2个作用:
1、用来序列化/反序列化消息
2、利用protobuffer生成的stub来抽象化调用过程
序列化与反序列化
编写.proto文件
option java_package = "rpc.proto";
option java_outer_classname = "MyResourceTrackerProtos";
option java_generate_equals_and_hash = true;
package rpc;
message MyResourceTrackerRequestProto {
required string hostId = 1;
required int32 cpu = 2;
required int32 memory = 3;
}
message RpcHeaderProto {
required string methodName = 1;
}
message MyResourceTrackerResponseProto {
required bool flag = 1;
}
解释下option:
- java_package:protoc编译生成的代码所在的包
- java_outer_classname:protoc编译生成的代码所在的类
- java_generate_equals_and_hash:对定义的message生成hash和equals方法
编译生成代码
由于hadoop使用的是proto2.5,这里使用2.5版本的protoc来编译proto文件
编译命令(注意路径,在proto文件所在目录下):
/usr/local/protoc2/bin/protoc --java_out=.. my_message.proto
生成的文件:
其中生成的类实现了
com.google.protobuf.Message
类
序列化
- 构建信息
MyResourceTrackerProtos.MyResourceTrackerRequestProto.Builder builder = MyResourceTrackerProtos.MyResourceTrackerRequestProto.newBuilder();
MyResourceTrackerProtos.MyResourceTrackerRequestProto req = builder.setCpu(1).setMemory(2).setHostId("localhost:5006").build();
- 序列化方式一:
byte[] byteArray = req.toByteArray();
这种方式在rpc中用的比较少,因为消息最后是需要写入到输出流的。
- 序列化方式二:
使用输出流,没有往outputStream中写入消息的长度信息,如果有拆包的需要,则这个方式不合适,比如有2个protobuffer的消息要写入到一个outputStream。
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
req.writeTo(outputStream);
outputStream.flush();
- 序列化方式三:
这种用的比较多,会往outputStream中写入消息的长度信息,可以用来拆包,在网络传输中很有用。
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
req.writeDelimitedTo(outputStream);
outputStream.flush();
反序列化
- 针对序列化方式三
byte[] byteArray1 = outputStream.toByteArray();
CodedInputStream codedInputStream = CodedInputStream.newInstance(byteArray1);
// 很重要:限制能从codedInputStream中读取的最大字节数。如果CodedInputStream中有多条消息,只会读第一条数据,不限定会读取所有数据,导致反序列化失败
codedInputStream.pushLimit(codedInputStream.readRawVarint32());
// 很重要:用来校验消息是否完整,每一个字段都对应一个tag,消息结尾的tag为0
codedInputStream.checkLastTagWas(0);
MyResourceTrackerProtos.MyResourceTrackerRequestProto deserilized = MyResourceTrackerProtos.MyResourceTrackerRequestProto.getDefaultInstance().getParserForType().parseFrom(codedInputStream);
System.out.println(deserilized);
结果:
利用protobuffer生成的stub来调用方法
定义proto文件
option java_package = "rpc.proto";
option java_outer_classname = "MyResourceTracker";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package rpc;
import "my_message.proto";
service MyResourceTrackerService {
rpc registerNodeManager(MyResourceTrackerRequestProto) returns(MyResourceTrackerResponseProto);
}
编译生成代码
/usr/local/protoc2/bin/protoc --java_out=.. -I ./ MyResourceTracker.proto
会在rpc.proto. MyResourceTracker
里生成一个抽象的静态内部类MyResourceTrackerService
使用MyResourceTrackerService
来调用registerNodeManager方法
// MyResourceTrackerService有2个接口,一个是BlockingInterface,另一个是Interface,这里需要实现BlockingInterface,因为它的方法中带返回值
BlockingService blockingService = MyResourceTracker.MyResourceTrackerService.newReflectiveBlockingService(new MyResourceTracker.MyResourceTrackerService.BlockingInterface() {
@Override
public MyResourceTrackerProtos.MyResourceTrackerResponseProto registerNodeManager(RpcController controller, MyResourceTrackerProtos.MyResourceTrackerRequestProto request) throws ServiceException {
return MyResourceTrackerProtos.MyResourceTrackerResponseProto.newBuilder().setFlag(true).build();
}
});
Descriptors.MethodDescriptor methodByName = blockingService.getDescriptorForType().findMethodByName("registerNodeManager");
// 获取方法的请求参数
Message requestPrototype = blockingService.getRequestPrototype(methodByName);
// 反序列化
Message message = requestPrototype.getParserForType().parseFrom(byteArray1);
// 调用registerNodeManager方法
MyResourceTrackerProtos.MyResourceTrackerResponseProto resp = (MyResourceTrackerProtos.MyResourceTrackerResponseProto)blockingService.callBlockingMethod(methodByName, null, message);
System.out.println(resp);
结果:
其它问题
问题一
如果遇到了Dependencies passed to FileDescriptor.buildFrom() don't match those listed in the FileDescriptorProto.
这样的错误,需要在执行protoc命令时,执行位置须为proto文件所在目录