elasticsearch中的Rest模块通过实现listener的方式,完成对请求的响应。
public interface ActionListener<Response> {
/**
* A response handler.
*/
void onResponse(Response response);
/**
* A failure handler.
*/
void onFailure(Throwable e);
}
ActionListener<Response>接口是对一般响应地抽象,只有正常响应和异常响应两个方面,没有任何实现——elasticsearch采用实现listener接口地方式实现异步响应,而具体对响应如何处理则看具体地实现,此处的Response可以是任意类型。
在看一看bulk请求:
void bulk(BulkRequest request, ActionListener<BulkResponse> listener);
和RestBulkAction的具体实现
client.bulk(bulkRequest, new RestBuilderListener<BulkResponse>(channel) {
@Override
public RestResponse buildResponse(BulkResponse response, XContentBuilder builder) throws Exception {
....
}
});
此处处理具体的BulkRequest,使用了具体的RestBuilderListener<BulkResponse>(channel)类,此类一定实现了ActionListener接口,此时的response是BulkResponse。
从这两个类就可以看出ES采用接口注入地方式处理响应,这样响应的具体格式,就可以在请求端实现。从buildResponse中就可以看到对BulkResponse地具体处理。
从下面的继承结构,可以看出RestResponse被处理地一般过程。
一句话概况各个类的作用:
ActionListener:最一般的响应抽象
RestActionListener:实现一般处理过程的框架,具体实现留给子类实现
RestResponseListener:将消息的构建过程以及发送过程分开
RestBuilderListener:专注于响应结构地构建
注意到elasticsearch源码的细微变化
/**
* An action listener that requires {@link #processResponse(Object)} to be implemented
* and will automatically handle failures.
*/
public abstract class RestActionListener<Response> implements ActionListener<Response> {
// we use static here so we won't have to pass the actual logger each time for a very rare case of logging
// where the settings don't matter that much
private static ESLogger logger = Loggers.getLogger(RestResponseListener.class);
protected final RestChannel channel;
protected RestActionListener(RestChannel channel) {
this.channel = channel;
}
//变成了final,固定了onResponse的处理框架,子类只需要关注正常response地处理
@Override
public final void onResponse(Response response) {
try {
processResponse(response);
} catch (Throwable t) {
onFailure(t);
}
}
protected abstract void processResponse(Response response) throws Exception;
@Override
public final void onFailure(Throwable e) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (Throwable e1) {
logger.error("failed to send failure response", e1);
}
}
}
/**
* A REST enabled action listener that has a basic onFailure implementation, and requires
* sub classes to only implement {@link #buildResponse(Object)}.
*/
public abstract class RestResponseListener<Response> extends RestActionListener<Response> {
protected RestResponseListener(RestChannel channel) {
super(channel);
}
//变成了final,固定了processResponse的处理框架,主要作用将buildResponse和sendResponse分开
@Override
protected final void processResponse(Response response) throws Exception {
channel.sendResponse(buildResponse(response));
}
/**
* Builds the response to send back through the channel.
*/
public abstract RestResponse buildResponse(Response response) throws Exception;
}
/**
* A REST action listener that builds an {@link XContentBuilder} based response.
*/
public abstract class RestBuilderListener<Response> extends RestResponseListener<Response> {
public RestBuilderListener(RestChannel channel) {
super(channel);
}
//固定buildResponse,使用XContentBuilder对Response进行构建
@Override
public final RestResponse buildResponse(Response response) throws Exception {
return buildResponse(response, channel.newBuilder());
}
/**
* Builds a response to send back over the channel.
*/
public abstract RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception;
}
之后实现RestBuilderListener的子类,会具体实现buildResponse的过程。
RestChannel主要完成两件事:
1.基于Request构建基于builder的输出
2.发送response
具体elsticsearch是如何发送response的呢?请看接下来的分析。