调用get users/_doc/1
1.Netty4HttpPipeliningHandler
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass();
HttpPipelinedRequest pipelinedRequest = aggregator.read(((Netty4HttpRequest) msg));
ctx.fireChannelRead(pipelinedRequest);
}
服务器读取到数据之后,到了netty的channelRead.在Netty4HttpServerTransport里注册了HttpChannelHandler了.
所以毫无疑问,走到了这个类里面的initChannel方法里.
@Override
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
handlingSettings.getMaxHeaderSize(),
handlingSettings.getMaxChunkSize()
);
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder);
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline().addLast("aggregator", aggregator);
if (handlingSettings.isCompression()) {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
ch.pipeline().addLast("request_creator", requestCreator);
ch.pipeline().addLast("response_creator", responseCreator);
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
上面这个方法里,就是netty的典型用法了.添加一些服务器端的handler.从这些名字很容易知晓handler的用法.我们直接看最后一个handler.
private final Netty4HttpRequestHandler requestHandler;
然后我们进入到channelRead0里面.
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());
assert Transports.assertTransportThread();
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
boolean success = false;
try {
//
serverTransport.incomingRequest(httpRequest, channel);
success = true;
} finally {
if (success == false) {
httpRequest.release();
}
}
}
然后我们进入incomingRequest里
public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
httpClientStatsTracker.updateClientStats(httpRequest, httpChannel);
final long startTime = threadPool.relativeTimeInMillis();
try {
handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException());
} finally {
final long took = threadPool.relativeTimeInMillis() - startTime;
final long logThreshold = slowLogThresholdMs;
if (logThreshold > 0 && took > logThreshold) {
logger.warn(
"handling request [{}][{}][{}][{}] took [{}ms] which is above the warn threshold of [{}ms]",
httpRequest.header(Task.X_OPAQUE_ID_HTTP_HEADER),
httpRequest.method(),
httpRequest.uri(),
httpChannel,
took,
logThreshold
);
}
}
}
然后到handleIncomingRequest里
httpRequest长这个样子
private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {
if (exception == null) {
HttpResponse earlyResponse = corsHandler.handleInbound(httpRequest);
if (earlyResponse != null) {
httpChannel.sendResponse(earlyResponse, earlyResponseListener(httpRequest, httpChannel));
httpRequest.release();
return;
}
}
Exception badRequestCause = exception;
/*
* We want to create a REST request from the incoming request from Netty. However, creating this request could fail if there
* are incorrectly encoded parameters, or the Content-Type header is invalid. If one of these specific failures occurs, we
* attempt to create a REST request again without the input that caused the exception (e.g., we remove the Content-Type header,
* or skip decoding the parameters). Once we have a request in hand, we then dispatch the request as a bad request with the
* underlying exception that caused us to treat the request as bad.
*/
final RestRequest restRequest;
{
RestRequest innerRestRequest;
try {
innerRestRequest = RestRequest.request(xContentRegistry, httpRequest, httpChannel);
} catch (final RestRequest.ContentTypeHeaderException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
innerRestRequest = requestWithoutContentTypeHeader(httpRequest, httpChannel, badRequestCause);
} catch (final RestRequest.BadParameterException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
}
restRequest = innerRestRequest;
}
final HttpTracer trace = tracer.maybeTraceRequest(restRequest, exception);
/*
* We now want to create a channel used to send the response on. However, creating this channel can fail if there are invalid
* parameter values for any of the filter_path, human, or pretty parameters. We detect these specific failures via an
* IllegalArgumentException from the channel constructor and then attempt to create a new channel that bypasses parsing of these
* parameter values.
*/
final RestChannel channel;
{
RestChannel innerChannel;
ThreadContext threadContext = threadPool.getThreadContext();
try {
innerChannel = new DefaultRestChannel(
httpChannel,
httpRequest,
restRequest,
bigArrays,
handlingSettings,
threadContext,
corsHandler,
trace
);
} catch (final IllegalArgumentException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
final RestRequest innerRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
innerChannel = new DefaultRestChannel(
httpChannel,
httpRequest,
innerRequest,
bigArrays,
handlingSettings,
threadContext,
corsHandler,
trace
);
}
channel = innerChannel;
}
//分发请求
dispatchRequest(restRequest, channel, badRequestCause);
}
// Visible for testing
void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
if (badRequestCause != null) {
dispatcher.dispatchBadRequest(channel, threadContext, badRequestCause);
} else {
dispatcher.dispatchRequest(restRequest, channel, threadContext);
}
}
}
protected final Dispatcher dispatcher;
Dispatcher是一个接口,这个方法的实现类是RestController.
interface Dispatcher {
/**
* Dispatches the {@link RestRequest} to the relevant request handler or responds to the given rest channel directly if
* the request can't be handled by any request handler.
*
* @param request the request to dispatch
* @param channel the response channel of this request
* @param threadContext the thread context
*/
void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext);
/**
* Dispatches a bad request. For example, if a request is malformed it will be dispatched via this method with the cause of the bad
* request.
*
* @param channel the response channel of this request
* @param threadContext the thread context
* @param cause the cause of the bad request
*/
void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause);
}
然后我们看RestController里的方法.
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
//
threadContext.addResponseHeader(ELASTIC_PRODUCT_HTTP_HEADER, ELASTIC_PRODUCT_HTTP_HEADER_VALUE);
try {
//
tryAllHandlers(request, channel, threadContext);
} catch (Exception e) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error(() -> new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}
进入到tryAllHandlers.
private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext)
throws Exception {
try {
copyRestHeaders(request, threadContext);
validateErrorTrace(request, channel);
} catch (IllegalArgumentException e) {
channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, e.getMessage()));
return;
}
//rawPath = /users/_doc/1
final String rawPath = request.rawPath();
final String uri = request.uri();
final RestRequest.Method requestMethod;
try {
// Resolves the HTTP method and fails if the method is invalid
requestMethod = request.method();
// Loop through all possible handlers, attempting to dispatch the request
Iterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath);
while (allHandlers.hasNext()) {
final RestHandler handler;
final MethodHandlers handlers = allHandlers.next();
if (handlers == null) {
handler = null;
} else {
handler = handlers.getHandler(requestMethod);
}
if (handler == null) {
if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
return;
}
} else {
dispatchRequest(request, channel, handler, threadContext);
return;
}
}
} catch (final IllegalArgumentException e) {
handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e);
return;
}
// If request has not been handled, fallback to a bad request error.
handleBadRequest(uri, requestMethod, channel);
}
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler, ThreadContext threadContext)
throws Exception {
final int contentLength = request.contentLength();
if (contentLength > 0) {
final XContentType xContentType = request.getXContentType();
if (xContentType == null) {
sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);
return;
}
if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {
channel.sendResponse(
BytesRestResponse.createSimpleErrorResponse(
channel,
RestStatus.NOT_ACCEPTABLE,
"Content-Type [" + xContentType + "] does not support stream parsing. Use JSON or SMILE instead"
)
);
return;
}
}
RestChannel responseChannel = channel;
try {
if (handler.canTripCircuitBreaker()) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
// TODO: Count requests double in the circuit breaker if they need copying?
if (handler.allowsUnsafeBuffers() == false) {
request.ensureSafeBuffers();
}
if (handler.allowSystemIndexAccessByDefault() == false) {
// The ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER indicates that the request is coming from an Elastic product and
// therefore we should allow a subset of external system index access.
// This header is intended for internal use only.
final String prodOriginValue = request.header(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);
if (prodOriginValue != null) {
threadContext.putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.TRUE.toString());
threadContext.putHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, prodOriginValue);
} else {
threadContext.putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.FALSE.toString());
}
} else {
threadContext.putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.TRUE.toString());
}
handler.handleRequest(request, responseChannel, client);
} catch (Exception e) {
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
}
}
然后进入到 handler.handleRequest(request, responseChannel, client);
handler = SecurityRestFilter
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
if (licenseState.isSecurityEnabled() && request.method() != Method.OPTIONS) {
// CORS - allow for preflight unauthenticated OPTIONS request
if (extractClientCertificate) {
HttpChannel httpChannel = request.getHttpChannel();
SSLEngineUtils.extractClientCertificates(logger, threadContext, httpChannel);
}
final String requestUri = request.uri();
authenticationService.authenticate(maybeWrapRestRequest(request), ActionListener.wrap(authentication -> {
if (authentication == null) {
logger.trace("No authentication available for REST request [{}]", requestUri);
} else {
logger.trace("Authenticated REST request [{}] as {}", requestUri, authentication);
}
secondaryAuthenticator.authenticateAndAttachToContext(request, ActionListener.wrap(secondaryAuthentication -> {
if (secondaryAuthentication != null) {
logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, requestUri);
}
RemoteHostHeader.process(request, threadContext);
restHandler.handleRequest(request, channel, client);
}, e -> handleException("Secondary authentication", request, channel, e)));
}, e -> handleException("Authentication", request, channel, e)));
} else {
if (request.method() != Method.OPTIONS) {
HeaderWarning.addWarning(
"Elasticsearch built-in security features are not enabled. Without "
+ "authentication, your cluster could be accessible to anyone. See "
+ "https://www.elastic.co/guide/en/elasticsearch/reference/"
+ Version.CURRENT.major
+ "."
+ Version.CURRENT.minor
+ "/security-minimal-setup.html to enable security."
);
}
restHandler.handleRequest(request, channel, client);
}
}
restHandler = RestGetAction
然后走到BaseRestHandler里的handleRequest.
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// prepare the request for execution; has the side effect of touching the request parameters
final RestChannelConsumer action = prepareRequest(request, client);
// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet<String> unconsumedParams = request.unconsumedParams()
.stream()
.filter(p -> responseParams().contains(p) == false)
.collect(Collectors.toCollection(TreeSet::new));
// validate the non-response params
if (unconsumedParams.isEmpty() == false) {
final Set<String> candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}
if (request.hasContent() && request.isContentConsumed() == false) {
throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
}
usageCount.increment();
// execute the action
action.accept(channel);
}
这把才到RestGetAction里.
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
GetRequest getRequest;
if (request.hasParam("type")) {
deprecationLogger.critical(DeprecationCategory.TYPES, "get_with_types", TYPES_DEPRECATION_MESSAGE);
getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
} else {
getRequest = new GetRequest(request.param("index"), request.param("id"));
}
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing"));
getRequest.preference(request.param("preference"));
getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime()));
if (request.param("fields") != null) {
throw new IllegalArgumentException(
"the parameter [fields] is no longer supported, "
+ "please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source"
);
}
final String fieldsParam = request.param("stored_fields");
if (fieldsParam != null) {
final String[] fields = Strings.splitStringByCommaToArray(fieldsParam);
if (fields != null) {
getRequest.storedFields(fields);
}
}
getRequest.version(RestActions.parseVersion(request));
getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));
getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));
return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {
@Override
protected RestStatus getStatus(final GetResponse response) {
return response.isExists() ? OK : NOT_FOUND;
}
});
}
然后client.get.这就是核心的方法了吧.client是NodeClient.
这个get先走到AbstractClient的get方法.
@Override
public void get(final GetRequest request, final ActionListener<GetResponse> listener) {
execute(GetAction.INSTANCE, request, listener);
}
然后到私有方法.
@Override
public final <Request extends ActionRequest, Response extends ActionResponse> void execute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
try {
listener = threadedWrapper.wrap(listener);
doExecute(action, request, listener);
} catch (Exception e) {
assert false : new AssertionError(e);
listener.onFailure(e);
}
}
然后doExecute.是个私有方法
protected abstract <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
);
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
// Discard the task because the Client interface doesn't use it.
try {
executeLocally(action, request, listener);
} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {
// #executeLocally returns the task and throws TaskCancelledException if it fails to register the task because the parent
// task has been cancelled, IllegalStateException if the client was not in a state to execute the request because it was not
// yet properly initialized or IllegalArgumentException if header validation fails we forward them to listener since this API
// does not concern itself with the specifics of the task handling
listener.onFailure(e);
}
}
然后再到私有方法.
public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
return transportAction(action).execute(request, listener);
}
先封装成一个TransportAction.然后在执行get方法.
/**
* Get the {@link TransportAction} for an {@link ActionType}, throwing exceptions if the action isn't available.
*/
private <Request extends ActionRequest, Response extends ActionResponse> TransportAction<Request, Response> transportAction(
ActionType<Response> action
) {
if (actions == null) {
throw new IllegalStateException("NodeClient has not been initialized");
}
@SuppressWarnings("unchecked")
TransportAction<Request, Response> transportAction = (TransportAction<Request, Response>) actions.get(action);
if (transportAction == null) {
throw new IllegalStateException("failed to find action [" + action + "] to execute");
}
return transportAction;
}
这把到了TransportAction里了.
public final Task execute(Request request, ActionListener<Response> listener) {
/*
* While this version of execute could delegate to the TaskListener
* version of execute that'd add yet another layer of wrapping on the
* listener and prevent us from using the listener bare if there isn't a
* task. That just seems like too many objects. Thus the two versions of
* this method.
*/
final Releasable unregisterChildNode = registerChildNode(request.getParentTask());
final Task task;
try {
task = taskManager.register("transport", actionName, request);
} catch (TaskCancelledException e) {
unregisterChildNode.close();
throw e;
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
//接受请求的返回值.
listener.onResponse(response);
}
}
@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(e);
}
}
});
return task;
}
我们来看看execute.
/**
* Use this method when the transport action should continue to run in the context of the current task
*/
public final void execute(Task task, Request request, ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}
if (task != null && request.getShouldStoreResult()) {
listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
}
//构造一个FilterChain.
RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
//然后开始处理.
requestFilterChain.proceed(task, actionName, request, listener);
}
然后到RequestFilterChain里的proceed.
@Override
public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
int i = index.getAndIncrement();
try {
//什么时候会走到这个分支,后面再仔细看看,先把主线拎清楚.
if (i < this.action.filters.length) {
this.action.filters[i].apply(task, actionName, request, listener, this);
} else if (i == this.action.filters.length) {
this.action.doExecute(task, request, listener);
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch (Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
然后走到this.action.doExecute(task, request, listener);
TransportReplicationAction
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
runReroutePhase(task, request, listener, true);
}
private void runReroutePhase(Task task, Request request, ActionListener<Response> listener, boolean initiatedByNodeClient) {
try {
new ReroutePhase((ReplicationTask) task, request, listener, initiatedByNodeClient).run();
} catch (RuntimeException e) {
listener.onFailure(e);
}
}
ReroutePhase是一个Runnable.run方法在AbstractRunnable里.doRun由子类来实现.
@Override
public final void run() {
try {
doRun();
} catch (Exception t) {
onFailure(t);
} finally {
onAfter();
}
}
这个方法有点复杂.
@Override
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
}
} else {
//索引元数据.
final IndexMetadata indexMetadata = state.metadata().index(request.shardId().getIndex());
if (indexMetadata == null) {
// ensure that the cluster state on the node is at least as high as the node that decided that the index was there
if (state.version() < request.routedBasedOnClusterVersion()) {
logger.trace(
"failed to find index [{}] for request [{}] despite sender thinking it would be here. "
+ "Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
request.shardId().getIndex(),
request,
state.version(),
request.routedBasedOnClusterVersion()
);
retry(
new IndexNotFoundException(
"failed to find index as current cluster state with version ["
+ state.version()
+ "] is stale (expected at least ["
+ request.routedBasedOnClusterVersion()
+ "]",
request.shardId().getIndexName()
)
);
return;
} else {
finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
return;
}
}
if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
finishAsFailed(new IndexClosedException(indexMetadata.getIndex()));
return;
}
if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
// if the wait for active shard count has not been set in the request,
// resolve it from the index settings
request.waitForActiveShards(indexMetadata.getWaitForActiveShards());
}
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT
: "request waitForActiveShards must be set in resolveRequest";
//分片路由
final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
if (primary == null || primary.active() == false) {
logger.trace(
"primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]",
request.shardId(),
actionName,
request,
state.version()
);
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
return;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace(
"primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]",
request.shardId(),
primary.currentNodeId(),
actionName,
request,
state.version()
);
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
//因为我本地只起了一个服务,那么就会走到local里.
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetadata);
} else {
performRemoteAction(state, primary, node);
}
}
}
private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node, IndexMetadata indexMetadata) {
setPhase(task, "waiting_on_primary");
if (logger.isTraceEnabled()) {
logger.trace(
"send action [{}] to local primary [{}] for request [{}] with cluster state version [{}] to [{}] ",
transportPrimaryAction,
request.shardId(),
request,
state.version(),
primary.currentNodeId()
);
}
//执行动作
performAction(
node,
transportPrimaryAction,
true,
new ConcreteShardRequest<>(
request,
primary.allocationId().getId(),
indexMetadata.primaryTerm(primary.id()),
true,
initiatedByNodeClient
)
);
}
private void performAction(
final DiscoveryNode node,
final String action,
final boolean isPrimaryAction,
final TransportRequest requestToPerform
) {
//走到发送请求到服务器的方法了.
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
@Override
public Response read(StreamInput in) throws IOException {
return newResponseInstance(in);
}
@Override
public void handleResponse(Response response) {
finishOnSuccess(response);
}
@Override
public void handleException(TransportException exp) {
try {
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
final Throwable cause = exp.unwrapCause();
if (cause instanceof ConnectTransportException
|| cause instanceof NodeClosedException
|| (isPrimaryAction && retryPrimaryException(cause))) {
logger.trace(
() -> new ParameterizedMessage(
"received an error from node [{}] for request [{}], scheduling a retry",
node.getId(),
requestToPerform
),
exp
);
retry(exp);
} else {
finishAsFailed(exp);
}
} catch (Exception e) {
e.addSuppressed(exp);
finishWithUnexpectedFailure(e);
}
}
});
我们到transportService里看看
public final <T extends TransportResponse> void sendRequest(
final DiscoveryNode node,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
final Transport.Connection connection;
try {
//获取连接.
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
//发送请求.
sendRequest(connection, action, request, options, handler);
}
/**
* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
*
* @param connection the connection to send the request on
* @param action the name of the action
* @param request the request
* @param options the options for this request
* @param handler the response handler
* @param <T> the type of the transport response
*/
public final <T extends TransportResponse> void sendRequest(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler
) {
try {
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual
// child task on the target node of the remote cluster.
// ----> a parent task on the local cluster
// |
// ----> a proxy task on the proxy node on the remote cluster
// |
// ----> an actual child task on the target node on the remote cluster
// To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target
// node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we
// unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
final Transport.Connection unwrappedConn = unwrapConnection(connection);
final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
handler.handleResponse(response);
}
@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
handler.handleException(exp);
}
@Override
public String executor() {
return handler.executor();
}
@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
}
};
} else {
delegate = handler;
}
//异步发送.
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new SendRequestTransportException(connection.getNode(), action, ex);
}
handler.handleException(te);
}
}
然后到了异步发送阶段了.
private final TransportInterceptor.AsyncSender asyncSender;
异步发送肯定是有一个线程池在处理这些请求.
然后到
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
final boolean requireAuth = shouldRequireExistingAuthentication();
// the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
// ourselves otherwise we wind up using a version newer than what we can actually send
final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);
// Sometimes a system action gets executed like a internal create index request or update mappings request
// which means that the user is copied over to system actions so we need to change the user
if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
securityContext.executeAsUser(
SystemUser.INSTANCE,
(original) -> sendWithUser(
connection,
action,
request,
options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
sender,
requireAuth
),
minVersion
);
} else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(
threadPool.getThreadContext(),
securityContext,
(original) -> sendWithUser(
connection,
action,
request,
options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
sender,
requireAuth
)
);
} else if (securityContext.getAuthentication() != null
&& securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
// re-write the authentication since we want the authentication version to match the version of the connection
securityContext.executeAfterRewritingAuthentication(
original -> sendWithUser(
connection,
action,
request,
options,
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
sender,
requireAuth
),
minVersion
);
} else {
sendWithUser(connection, action, request, options, handler, sender, requireAuth);
}
}
};
}
然后到securityContext.executeAsUser.
public void executeAsUser(User user, Consumer<StoredContext> consumer, Version version) {
final StoredContext original = threadContext.newStoredContext(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
setUser(user, version);
consumer.accept(original);
}
}
然后到sendWithUser.
private <T extends TransportResponse> void sendWithUser(
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler,
AsyncSender sender,
final boolean requireAuthentication
) {
if (securityContext.getAuthentication() == null && requireAuthentication) {
// we use an assertion here to ensure we catch this in our testing infrastructure, but leave the ISE for cases we do not catch
// in tests and may be hit by a user
assertNoAuthentication(action);
throw new IllegalStateException("there should always be a user when sending a message for action [" + action + "]");
}
try {
sender.sendRequest(connection, action, request, options, handler);
} catch (Exception e) {
handler.handleException(new SendRequestTransportException(connection.getNode(), action, e));
}
}
这个sender是一个lambada表达式,在TransportService里.
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
然后走到sendRequestInternal里.
private <T extends TransportResponse> void sendRequestInternal(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
if (connection == null) {
throw new IllegalStateException("can't send request to a null connection");
}
DiscoveryNode node = connection.getNode();
Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
// TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
final TimeoutHandler timeoutHandler;
if (options.timeout() != null) {
timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
responseHandler.setTimeoutHandler(timeoutHandler);
} else {
timeoutHandler = null;
}
try {
if (lifecycle.stoppedOrClosed()) {
/*
* If we are not started the exception handling will remove the request holder again and calls the handler to notify the
* caller. It will only notify if toStop hasn't done the work yet.
*/
throw new NodeClosedException(localNode);
}
if (timeoutHandler != null) {
assert options.timeout() != null;
timeoutHandler.scheduleTimeout(options.timeout());
}
connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
} catch (final Exception e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);
// If holderToNotify == null then handler has already been taken care of.
if (contextToNotify != null) {
if (timeoutHandler != null) {
timeoutHandler.cancel();
}
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current
// thread on a best effort basis though.
final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onRejection(Exception e) {
// if we get rejected during node shutdown we don't wanna bubble it up
logger.debug(
() -> new ParameterizedMessage(
"failed to notify response handler on rejection, action: {}",
contextToNotify.action()
),
e
);
}
@Override
public void onFailure(Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"failed to notify response handler on exception, action: {}",
contextToNotify.action()
),
e
);
}
@Override
protected void doRun() throws Exception {
contextToNotify.handler().handleException(sendRequestException);
}
});
} else {
logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
}
}
}
然后就到了有连接来发送请求了.
connection.sendRequest(requestId, action, request, options);
Transport.Connection = new Transport.Connection{
...
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
sendLocalRequest(requestId, action, request, options);
}
private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
//
final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
try {
onRequestSent(localNode, requestId, action, request, options);
onRequestReceived(requestId, action);
@SuppressWarnings("unchecked")
final RequestHandlerRegistry<TransportRequest> reg = (RequestHandlerRegistry<TransportRequest>) getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
}
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
reg.processMessageReceived(request, channel);
} else {
boolean success = false;
request.incRef();
try {
threadPool.executor(executor).execute(new AbstractRunnable() {
//
@Override
protected void doRun() throws Exception {
reg.processMessageReceived(request, channel);
}
@Override
public boolean isForceExecution() {
return reg.isForceExecution();
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(
() -> new ParameterizedMessage("failed to notify channel of error message for action [{}]", action),
inner
);
}
}
@Override
public String toString() {
return "processing of [" + requestId + "][" + action + "]: " + request;
}
@Override
public void onAfter() {
request.decRef();
}
});
success = true;
} finally {
if (success == false) {
request.decRef();
}
}
}
} catch (Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage("failed to notify channel of error message for action [{}]", action), inner);
}
}
}
doRun就进入到核心的get流程了,最终会走到InternalEngine里,这个类到es的核心引擎层了.
@Override
protected void doRun() throws Exception {
handler.messageReceived(request, channel, task);
}
@Override
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
//
asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
}
@Override
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
super.asyncShardOperation(request, shardId, listener);
} else {
indexShard.awaitShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
});
}
}
有线程池开始处理了.
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(getExecutor(request, shardId)).execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId)));
}
@Override
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.refresh() && request.realtime() == false) {
indexShard.refresh("refresh_flag_get");
}
GetResult result = indexShard.getService()
.get(
request.type(),
request.id(),
request.storedFields(),
request.realtime(),
request.version(),
request.versionType(),
request.fetchSourceContext()
);
return new GetResponse(result);
}
public GetResult get(
String type,
String id,
String[] gFields,
boolean realtime,
long version,
VersionType versionType,
FetchSourceContext fetchSourceContext
) {
return get(type, id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext);
}
ShardGetService
private GetResult innerGet(
String type,
String id,
String[] gFields,
boolean realtime,
long version,
VersionType versionType,
long ifSeqNo,
long ifPrimaryTerm,
FetchSourceContext fetchSourceContext
) {
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
if (type == null || type.equals("_all")) {
MappingLookup mappingLookup = mapperService.mappingLookup();
type = mappingLookup.hasMappings() ? mappingLookup.getType() : null;
}
Engine.GetResult get = null;
if (type != null) {
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
get = indexShard.get(
new Engine.Get(realtime, realtime, type, id, uidTerm).version(version)
.versionType(versionType)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
);
if (get.exists() == false) {
get.close();
}
}
if (get == null || get.exists() == false) {
return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
}
try {
// break between having loaded it from translog (so we only have _source), and having a document to load
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get);
} finally {
get.close();
}
}
IndexShard
public Engine.GetResult get(Engine.Get get) {
readAllowed();
MappingLookup mappingLookup = mapperService.mappingLookup();
if (mappingLookup.hasMappings() == false
|| mappingLookup.getType().equals(mapperService.resolveDocumentType(get.type())) == false) {
return GetResult.NOT_EXISTS;
}
//获取引擎然后get
return getEngine().get(get, mappingLookup, mapperService.documentParser(), this::wrapSearcher);
}
成了,这个方法后面再看,是从哪里取到索引的,也就能看到es的索引存储结构了.
@Override
public GetResult get(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Engine.Searcher, Engine.Searcher> searcherWrapper
) {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
System.out.println("get"+get.toString());
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (get.realtime()) {
final VersionValue versionValue;
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(
shardId,
get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version())
);
}
if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) {
throw new VersionConflictEngineException(
shardId,
get.id(),
get.getIfSeqNo(),
get.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
}
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
return getFromTranslog(get, (Translog.Index) operation, mappingLookup, documentParser, searcherWrapper);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
}
}
assert versionValue.seqNo >= 0 : versionValue;
refreshIfNeeded("realtime_get", versionValue.seqNo);
}
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false);
}
}
}
get到结果之后,就要返回了.上面的innerGet会封装成一个GetResult,然后在shardOperation里会把getResult封装成GetResponse.
有了getResponse后,在(ActionRunnable.supply(listener, () -> shardOperation(request, shardId))里,监听者就要发挥作用了.
public static <T> ActionRunnable<T> supply(ActionListener<T> listener, CheckedSupplier<T, Exception> supplier) {
return ActionRunnable.wrap(listener, l -> l.onResponse(supplier.get()));
}
@Override
public void onResponse(Response response) {
try {
//TaskTransportChannel
channel.sendResponse(response);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
try {
onTaskFinished.close();
} finally {
//DirectResponseChannel.
channel.sendResponse(response);
}
}
基本上到这里就结束了,简单的流程就是先封装好一个请求,然后由线程池处理请求,线程池处理完请求发给对应的channel.基本上就是client,server的典型处理流程.然后总结下调用链中每个涉及到的类吧,多思考思考这些大牛是如果做类的设计,以及为什么要这么设计.
流程虽然绕,但是这个总体的过程就是个client,server的过程,基础很重要,所以一定要把netty搞透彻了.es的代码是真的顶!下一篇会把这里面的类的设计,以及一些很惊艳的代码拉出来鉴赏一下.