前言
基于上一次的简单介绍做一次稍微深入的分析
正文
- 客户端如何发送
由上次结尾的一个简单demo作为分析的例子,看下作为客户端如何发送数据的。
public class HelloServiceClient {
public static void main(String[] args) throws TException {
System.out.println("客户端启动.....");
//1.初始化传输工具
TTransport transport =new TSocket("10.10.163.11", 8999, 30000);
//2.绑定传输协议
TProtocol protocol = new TBinaryProtocol(transport);
//3.client绑定传输协议
Hello.Client client = new Hello.Client(protocol);
transport.open();
//4.client调用,发送消息
String result = client.helloWorld("TOM");
System.out.println(result);
}
}
看一下Hello.Client调用方法
public String helloWorld(String para) throws org.apache.thrift.TException
{
send_helloWorld(para);
return recv_helloWorld();
}
public void send_helloWorld(String para) throws org.apache.thrift.TException
{
//初始化并设置参数对象
helloWorld_args args = new helloWorld_args();
args.setPara(para);
//发送信息
sendBase("helloWorld", args);
}
Hello.Client中sendBase方法
protected void sendBase(String methodName, TBase<?,?> args) throws TException {
sendBase(methodName, args, TMessageType.CALL);
}
private void sendBase(String methodName, TBase<?,?> args, byte type) throws TException {
//封装消息并写入传输协议中
oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
args.write(oprot_);
oprot_.writeMessageEnd();
//发送消息
oprot_.getTransport().flush();
}
TMessage对象定义了三个成员变量,重点还是在writeMessageBegin方法中
由于我们的demo绑定的传输协议是TBinaryProtocol(二进制编码格式进行数据传输)
所以我们查看TBinaryProtocol的writeMessageBegin方法
public void writeMessageBegin(TMessage message) throws TException {
if (strictWrite_) {
int version = VERSION_1 | message.type;
writeI32(version);
writeString(message.name);
writeI32(message.seqid);
} else {
writeString(message.name);
writeByte(message.type);
writeI32(message.seqid);
}
}
public void writeI32(int i32) throws TException {
inoutTemp[0] = (byte)(0xff & (i32 >> 24));
inoutTemp[1] = (byte)(0xff & (i32 >> 16));
inoutTemp[2] = (byte)(0xff & (i32 >> 8));
inoutTemp[3] = (byte)(0xff & (i32));
trans_.write(inoutTemp, 0, 4);
}
public void writeString(String str) throws TException {
try {
byte[] dat = str.getBytes("UTF-8");
writeI32(dat.length);
trans_.write(dat, 0, dat.length);
} catch (UnsupportedEncodingException uex) {
throw new TException("JVM DOES NOT SUPPORT UTF-8");
}
}
基本上就是写入版本,写入方法长度,写入方法名,最后在写入seqid
最后再由Hello.helloWorld_args类调用helloWorld_argsStandardScheme中write方法写入参数信息
public void write(org.apache.thrift.protocol.TProtocol oprot, helloWorld_args struct) throws org.apache.thrift.TException {
struct.validate();//空方法
oprot.writeStructBegin(STRUCT_DESC);//空方法
if (struct.para != null) {
//写入参数信息
oprot.writeFieldBegin(PARA_FIELD_DESC);
oprot.writeString(struct.para);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
下面看下接受信息的处理
//Hello.Client
public String recv_helloWorld() throws org.apache.thrift.TException
{
helloWorld_result result = new helloWorld_result();
receiveBase(result, "helloWorld");
if (result.isSetSuccess()) {
return result.success;
}
...
}
protected void receiveBase(TBase<?,?> result, String methodName) throws TException {
TMessage msg = iprot_.readMessageBegin();
if (msg.type == TMessageType.EXCEPTION) {
...
}
System.out.format("Received %d%n", msg.seqid);
if (msg.seqid != seqid_) {
...
}
result.read(iprot_);
iprot_.readMessageEnd();
}
以上就是客户端的整个处理流程,大概时序图如下
下面看下服务端的处理流程
先看下服务端的代码
public class HelloServiceServer {
public static void main(String[] args) throws TTransportException {
System.out.println("服务端开启......");
TProcessor tProcessor=new Hello.Processor<Hello.Iface>(new HelloServiceImpl());
TServerSocket serverSocket = new TServerSocket(8999);
TServer.Args tArgs = new TServer.Args(serverSocket);
tArgs.processor(tProcessor);
tArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new TSimpleServer(tArgs);
server.serve();
}
}
前面的都是一些绑定processor,protocol还有启动绑定serversocket,主要看一下server.serve();
public void serve() {
....
client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
if (eventHandler_ != null) {
connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
}
while (true) {
if (eventHandler_ != null) {
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
if(!processor.process(inputProtocol, outputProtocol)) {
break;
}
}
}
....
}
前面的代码主要是从客户端获取对应的数据,主要看下处理流程即processor.process()
public boolean process(TProtocol in, TProtocol out) throws TException {
TMessage msg = in.readMessageBegin();//获取TMessage
ProcessFunction fn = processMap.get(msg.name);//根据方法名获取对应的方法处理类
if (fn == null) {
.....
}
fn.process(msg.seqid, in, out, iface);
return true;
}
当获取方法处理类时,执行处理方法
//Hello.java 中helloWorld类
public helloWorld_args getEmptyArgsInstance() {
return new helloWorld_args();
}
//helloWorld_args类中read方法,与客户端对应
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
//Hello.java 中Processor类执行了实现类HelloServiceImpl
public helloWorld_result getResult(I iface, helloWorld_args args) throws org.apache.thrift.TException {
helloWorld_result result = new helloWorld_result();
result.success = iface.helloWorld(args.para);
return result;
}
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException {
T args = getEmptyArgsInstance();
try {
args.read(iprot);
} catch (TProtocolException e) {
....
}
iprot.readMessageEnd();
TBase result = null;
try {
result = getResult(iface, args);
} catch(TException tex) {
....
}
if(!isOneway()) {//跟客户端差不多了
oprot.writeMessageBegin(new TMessage(getMethodName(), TMessageType.REPLY, seqid));
result.write(oprot);//结果写到输出流
oprot.writeMessageEnd();
oprot.getTransport().flush();
}
}
总结
我们再来看一下Thrift的协议栈以及上述流程的抓包信息
基本上对Thrift的流程熟悉了。