es之获取索引的过程

调用get users/_doc/1

1.Netty4HttpPipeliningHandler

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
    assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass();
    HttpPipelinedRequest pipelinedRequest = aggregator.read(((Netty4HttpRequest) msg));
    ctx.fireChannelRead(pipelinedRequest);
}

服务器读取到数据之后,到了netty的channelRead.在Netty4HttpServerTransport里注册了HttpChannelHandler了.

所以毫无疑问,走到了这个类里面的initChannel方法里.

@Override
protected void initChannel(Channel ch) throws Exception {
    Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
    ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
    ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
    ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
    final HttpRequestDecoder decoder = new HttpRequestDecoder(
        handlingSettings.getMaxInitialLineLength(),
        handlingSettings.getMaxHeaderSize(),
        handlingSettings.getMaxChunkSize()
    );
    decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
    ch.pipeline().addLast("decoder", decoder);
    ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
    ch.pipeline().addLast("encoder", new HttpResponseEncoder());
    final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
    aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
    ch.pipeline().addLast("aggregator", aggregator);
    if (handlingSettings.isCompression()) {
        ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
    }
    ch.pipeline().addLast("request_creator", requestCreator);
    ch.pipeline().addLast("response_creator", responseCreator);
    ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
    ch.pipeline().addLast("handler", requestHandler);
    transport.serverAcceptedChannel(nettyHttpChannel);

上面这个方法里,就是netty的典型用法了.添加一些服务器端的handler.从这些名字很容易知晓handler的用法.我们直接看最后一个handler.

private final Netty4HttpRequestHandler requestHandler;

然后我们进入到channelRead0里面.

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
    assert Transports.assertDefaultThreadContext(serverTransport.getThreadPool().getThreadContext());
    assert Transports.assertTransportThread();
    final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
    boolean success = false;
    try {
            //
        serverTransport.incomingRequest(httpRequest, channel);
        success = true;
    } finally {
        if (success == false) {
            httpRequest.release();
        }
    }
}

然后我们进入incomingRequest里

    public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
        httpClientStatsTracker.updateClientStats(httpRequest, httpChannel);
        final long startTime = threadPool.relativeTimeInMillis();
        try {
            handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException());
        } finally {
            final long took = threadPool.relativeTimeInMillis() - startTime;
            final long logThreshold = slowLogThresholdMs;
            if (logThreshold > 0 && took > logThreshold) {
                logger.warn(
                    "handling request [{}][{}][{}][{}] took [{}ms] which is above the warn threshold of [{}ms]",
                    httpRequest.header(Task.X_OPAQUE_ID_HTTP_HEADER),
                    httpRequest.method(),
                    httpRequest.uri(),
                    httpChannel,
                    took,
                    logThreshold
                );
            }
        }
    }

然后到handleIncomingRequest里

httpRequest长这个样子

image.png
  private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {
        if (exception == null) {
            HttpResponse earlyResponse = corsHandler.handleInbound(httpRequest);
            if (earlyResponse != null) {
                httpChannel.sendResponse(earlyResponse, earlyResponseListener(httpRequest, httpChannel));
                httpRequest.release();
                return;
            }
        }

        Exception badRequestCause = exception;

        /*
         * We want to create a REST request from the incoming request from Netty. However, creating this request could fail if there
         * are incorrectly encoded parameters, or the Content-Type header is invalid. If one of these specific failures occurs, we
         * attempt to create a REST request again without the input that caused the exception (e.g., we remove the Content-Type header,
         * or skip decoding the parameters). Once we have a request in hand, we then dispatch the request as a bad request with the
         * underlying exception that caused us to treat the request as bad.
         */
        final RestRequest restRequest;
        {
            RestRequest innerRestRequest;
            try {
                innerRestRequest = RestRequest.request(xContentRegistry, httpRequest, httpChannel);
            } catch (final RestRequest.ContentTypeHeaderException e) {
                badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
                innerRestRequest = requestWithoutContentTypeHeader(httpRequest, httpChannel, badRequestCause);
            } catch (final RestRequest.BadParameterException e) {
                badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
                innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
            }
            restRequest = innerRestRequest;
        }

        final HttpTracer trace = tracer.maybeTraceRequest(restRequest, exception);

        /*
         * We now want to create a channel used to send the response on. However, creating this channel can fail if there are invalid
         * parameter values for any of the filter_path, human, or pretty parameters. We detect these specific failures via an
         * IllegalArgumentException from the channel constructor and then attempt to create a new channel that bypasses parsing of these
         * parameter values.
         */
        final RestChannel channel;
        {
            RestChannel innerChannel;
            ThreadContext threadContext = threadPool.getThreadContext();
            try {
                innerChannel = new DefaultRestChannel(
                    httpChannel,
                    httpRequest,
                    restRequest,
                    bigArrays,
                    handlingSettings,
                    threadContext,
                    corsHandler,
                    trace
                );
            } catch (final IllegalArgumentException e) {
                badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
                final RestRequest innerRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
                innerChannel = new DefaultRestChannel(
                    httpChannel,
                    httpRequest,
                    innerRequest,
                    bigArrays,
                    handlingSettings,
                    threadContext,
                    corsHandler,
                    trace
                );
            }
            channel = innerChannel;
        }
                //分发请求
        dispatchRequest(restRequest, channel, badRequestCause);
    }
// Visible for testing
void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) {
    final ThreadContext threadContext = threadPool.getThreadContext();
    try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
        if (badRequestCause != null) {
            dispatcher.dispatchBadRequest(channel, threadContext, badRequestCause);
        } else {
            dispatcher.dispatchRequest(restRequest, channel, threadContext);
        }
    }
}
protected final Dispatcher dispatcher;

Dispatcher是一个接口,这个方法的实现类是RestController.

    interface Dispatcher {

        /**
         * Dispatches the {@link RestRequest} to the relevant request handler or responds to the given rest channel directly if
         * the request can't be handled by any request handler.
         *
         * @param request       the request to dispatch
         * @param channel       the response channel of this request
         * @param threadContext the thread context
         */
        void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext);

        /**
         * Dispatches a bad request. For example, if a request is malformed it will be dispatched via this method with the cause of the bad
         * request.
         *
         * @param channel       the response channel of this request
         * @param threadContext the thread context
         * @param cause         the cause of the bad request
         */
        void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause);

    }

然后我们看RestController里的方法.

    @Override
    public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
        //
        threadContext.addResponseHeader(ELASTIC_PRODUCT_HTTP_HEADER, ELASTIC_PRODUCT_HTTP_HEADER_VALUE);
        try {
            //
            tryAllHandlers(request, channel, threadContext);
        } catch (Exception e) {
            try {
                channel.sendResponse(new BytesRestResponse(channel, e));
            } catch (Exception inner) {
                inner.addSuppressed(e);
                logger.error(() -> new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
            }
        }
    }

进入到tryAllHandlers.


 private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) 
   throws Exception {
        try {
            copyRestHeaders(request, threadContext);
            validateErrorTrace(request, channel);
        } catch (IllegalArgumentException e) {
            channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, e.getMessage()));
            return;
        }
                //rawPath = /users/_doc/1
        final String rawPath = request.rawPath();
        final String uri = request.uri();
        final RestRequest.Method requestMethod;
        try {
            // Resolves the HTTP method and fails if the method is invalid
            requestMethod = request.method();
            // Loop through all possible handlers, attempting to dispatch the request
            Iterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath);
            while (allHandlers.hasNext()) {
                final RestHandler handler;
                final MethodHandlers handlers = allHandlers.next();
                if (handlers == null) {
                    handler = null;
                } else {
                    handler = handlers.getHandler(requestMethod);
                }
                if (handler == null) {
                    if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
                        return;
                    }
                } else {
                    dispatchRequest(request, channel, handler, threadContext);
                    return;
                }
            }
        } catch (final IllegalArgumentException e) {
            handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e);
            return;
        }
        // If request has not been handled, fallback to a bad request error.
        handleBadRequest(uri, requestMethod, channel);
    }
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler, ThreadContext threadContext)
    throws Exception {
    final int contentLength = request.contentLength();
    if (contentLength > 0) {
        final XContentType xContentType = request.getXContentType();
        if (xContentType == null) {
            sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);
            return;
        }
        if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {
            channel.sendResponse(
                BytesRestResponse.createSimpleErrorResponse(
                    channel,
                    RestStatus.NOT_ACCEPTABLE,
                    "Content-Type [" + xContentType + "] does not support stream parsing. Use JSON or SMILE instead"
                )
            );
            return;
        }
    }
    RestChannel responseChannel = channel;
    try {
        if (handler.canTripCircuitBreaker()) {
            inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
        } else {
            inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
        }
        // iff we could reserve bytes for the request we need to send the response also over this channel
        responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
        // TODO: Count requests double in the circuit breaker if they need copying?
        if (handler.allowsUnsafeBuffers() == false) {
            request.ensureSafeBuffers();
        }

        if (handler.allowSystemIndexAccessByDefault() == false) {
            // The ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER indicates that the request is coming from an Elastic product and
            // therefore we should allow a subset of external system index access.
            // This header is intended for internal use only.
            final String prodOriginValue = request.header(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);
            if (prodOriginValue != null) {
                threadContext.putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.TRUE.toString());
                threadContext.putHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, prodOriginValue);
            } else {
                threadContext.putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.FALSE.toString());
            }
        } else {
            threadContext.putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.TRUE.toString());
        }

        handler.handleRequest(request, responseChannel, client);
    } catch (Exception e) {
        responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
    }
}

然后进入到 handler.handleRequest(request, responseChannel, client);

handler = SecurityRestFilter

 @Override
    public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        if (licenseState.isSecurityEnabled() && request.method() != Method.OPTIONS) {
            // CORS - allow for preflight unauthenticated OPTIONS request
            if (extractClientCertificate) {
                HttpChannel httpChannel = request.getHttpChannel();
                SSLEngineUtils.extractClientCertificates(logger, threadContext, httpChannel);
            }

            final String requestUri = request.uri();
            authenticationService.authenticate(maybeWrapRestRequest(request), ActionListener.wrap(authentication -> {
                if (authentication == null) {
                    logger.trace("No authentication available for REST request [{}]", requestUri);
                } else {
                    logger.trace("Authenticated REST request [{}] as {}", requestUri, authentication);
                }
                secondaryAuthenticator.authenticateAndAttachToContext(request, ActionListener.wrap(secondaryAuthentication -> {
                    if (secondaryAuthentication != null) {
                        logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, requestUri);
                    }
                    RemoteHostHeader.process(request, threadContext);
                    restHandler.handleRequest(request, channel, client);
                }, e -> handleException("Secondary authentication", request, channel, e)));
            }, e -> handleException("Authentication", request, channel, e)));
        } else {
            if (request.method() != Method.OPTIONS) {
                HeaderWarning.addWarning(
                    "Elasticsearch built-in security features are not enabled. Without "
                        + "authentication, your cluster could be accessible to anyone. See "
                        + "https://www.elastic.co/guide/en/elasticsearch/reference/"
                        + Version.CURRENT.major
                        + "."
                        + Version.CURRENT.minor
                        + "/security-minimal-setup.html to enable security."
                );
            }
            restHandler.handleRequest(request, channel, client);
        }
    }

restHandler = RestGetAction

然后走到BaseRestHandler里的handleRequest.

@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
    // prepare the request for execution; has the side effect of touching the request parameters
    final RestChannelConsumer action = prepareRequest(request, client);

    // validate unconsumed params, but we must exclude params used to format the response
    // use a sorted set so the unconsumed parameters appear in a reliable sorted order
    final SortedSet<String> unconsumedParams = request.unconsumedParams()
        .stream()
        .filter(p -> responseParams().contains(p) == false)
        .collect(Collectors.toCollection(TreeSet::new));

    // validate the non-response params
    if (unconsumedParams.isEmpty() == false) {
        final Set<String> candidateParams = new HashSet<>();
        candidateParams.addAll(request.consumedParams());
        candidateParams.addAll(responseParams());
        throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
    }

    if (request.hasContent() && request.isContentConsumed() == false) {
        throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
    }

    usageCount.increment();
    // execute the action
    action.accept(channel);
}

这把才到RestGetAction里.

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    GetRequest getRequest;
    if (request.hasParam("type")) {
        deprecationLogger.critical(DeprecationCategory.TYPES, "get_with_types", TYPES_DEPRECATION_MESSAGE);
        getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
    } else {
        getRequest = new GetRequest(request.param("index"), request.param("id"));
    }

    getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
    getRequest.routing(request.param("routing"));
    getRequest.preference(request.param("preference"));
    getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime()));
    if (request.param("fields") != null) {
        throw new IllegalArgumentException(
            "the parameter [fields] is no longer supported, "
                + "please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source"
        );
    }
    final String fieldsParam = request.param("stored_fields");
    if (fieldsParam != null) {
        final String[] fields = Strings.splitStringByCommaToArray(fieldsParam);
        if (fields != null) {
            getRequest.storedFields(fields);
        }
    }

    getRequest.version(RestActions.parseVersion(request));
    getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));

    getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));

    return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {
        @Override
        protected RestStatus getStatus(final GetResponse response) {
            return response.isExists() ? OK : NOT_FOUND;
        }
    });
}

然后client.get.这就是核心的方法了吧.client是NodeClient.

这个get先走到AbstractClient的get方法.

   @Override
    public void get(final GetRequest request, final ActionListener<GetResponse> listener) {
        execute(GetAction.INSTANCE, request, listener);
    }

然后到私有方法.

  @Override
    public final <Request extends ActionRequest, Response extends ActionResponse> void execute(
        ActionType<Response> action,
        Request request,
        ActionListener<Response> listener
    ) {
        try {
            listener = threadedWrapper.wrap(listener);
            doExecute(action, request, listener);
        } catch (Exception e) {
            assert false : new AssertionError(e);
            listener.onFailure(e);
        }
    }

然后doExecute.是个私有方法

    protected abstract <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
        ActionType<Response> action,
        Request request,
        ActionListener<Response> listener
    );
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
    ActionType<Response> action,
    Request request,
    ActionListener<Response> listener
) {
    // Discard the task because the Client interface doesn't use it.
    try {
        executeLocally(action, request, listener);
    } catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {
        // #executeLocally returns the task and throws TaskCancelledException if it fails to register the task because the parent
        // task has been cancelled, IllegalStateException if the client was not in a state to execute the request because it was not
        // yet properly initialized or IllegalArgumentException if header validation fails we forward them to listener since this API
        // does not concern itself with the specifics of the task handling
        listener.onFailure(e);
    }
}

然后再到私有方法.

    public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(
        ActionType<Response> action,
        Request request,
        ActionListener<Response> listener
    ) {

        return transportAction(action).execute(request, listener);
    }

先封装成一个TransportAction.然后在执行get方法.

  /**
     * Get the {@link TransportAction} for an {@link ActionType}, throwing exceptions if the action isn't available.
     */
    private <Request extends ActionRequest, Response extends ActionResponse> TransportAction<Request, Response> transportAction(
        ActionType<Response> action
    ) {
        if (actions == null) {
            throw new IllegalStateException("NodeClient has not been initialized");
        }
        @SuppressWarnings("unchecked")
        TransportAction<Request, Response> transportAction = (TransportAction<Request, Response>) actions.get(action);
        if (transportAction == null) {
            throw new IllegalStateException("failed to find action [" + action + "] to execute");
        }
        return transportAction;
    }

这把到了TransportAction里了.

public final Task execute(Request request, ActionListener<Response> listener) {
    /*
     * While this version of execute could delegate to the TaskListener
     * version of execute that'd add yet another layer of wrapping on the
     * listener and prevent us from using the listener bare if there isn't a
     * task. That just seems like too many objects. Thus the two versions of
     * this method.
     */
    final Releasable unregisterChildNode = registerChildNode(request.getParentTask());
    final Task task;
    try {
        task = taskManager.register("transport", actionName, request);
    } catch (TaskCancelledException e) {
        unregisterChildNode.close();
        throw e;
    }
    execute(task, request, new ActionListener<Response>() {
        @Override
        public void onResponse(Response response) {
            try {
                Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
            } finally {
                //接受请求的返回值.
                listener.onResponse(response);
            }
        }

        @Override
        public void onFailure(Exception e) {
            try {
                Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
            } finally {
                listener.onFailure(e);
            }
        }
    });
    return task;
}

我们来看看execute.

/**
 * Use this method when the transport action should continue to run in the context of the current task
 */
public final void execute(Task task, Request request, ActionListener<Response> listener) {
    ActionRequestValidationException validationException = request.validate();
    if (validationException != null) {
        listener.onFailure(validationException);
        return;
    }

    if (task != null && request.getShouldStoreResult()) {
        listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
    }
        //构造一个FilterChain.
    RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
    //然后开始处理.
    requestFilterChain.proceed(task, actionName, request, listener);
}

然后到RequestFilterChain里的proceed.

@Override
public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
    int i = index.getAndIncrement();
    try {
            //什么时候会走到这个分支,后面再仔细看看,先把主线拎清楚.
        if (i < this.action.filters.length) {
            this.action.filters[i].apply(task, actionName, request, listener, this);
        } else if (i == this.action.filters.length) {
            this.action.doExecute(task, request, listener);
        } else {
            listener.onFailure(new IllegalStateException("proceed was called too many times"));
        }
    } catch (Exception e) {
        logger.trace("Error during transport action execution.", e);
        listener.onFailure(e);
    }
}

然后走到this.action.doExecute(task, request, listener);

TransportReplicationAction

    @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        assert request.shardId() != null : "request shardId must be set";
        runReroutePhase(task, request, listener, true);
    }
   private void runReroutePhase(Task task, Request request, ActionListener<Response> listener, boolean initiatedByNodeClient) {
        try {
            new ReroutePhase((ReplicationTask) task, request, listener, initiatedByNodeClient).run();
        } catch (RuntimeException e) {
            listener.onFailure(e);
        }
    }

ReroutePhase是一个Runnable.run方法在AbstractRunnable里.doRun由子类来实现.

    @Override
    public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

这个方法有点复杂.

@Override
protected void doRun() {
    setPhase(task, "routing");
    final ClusterState state = observer.setAndGetObservedState();
    final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
    if (blockException != null) {
        if (blockException.retryable()) {
            logger.trace("cluster is blocked, scheduling a retry", blockException);
            retry(blockException);
        } else {
            finishAsFailed(blockException);
        }
    } else {
        //索引元数据.
        final IndexMetadata indexMetadata = state.metadata().index(request.shardId().getIndex());
        if (indexMetadata == null) {
            // ensure that the cluster state on the node is at least as high as the node that decided that the index was there
            if (state.version() < request.routedBasedOnClusterVersion()) {
                logger.trace(
                    "failed to find index [{}] for request [{}] despite sender thinking it would be here. "
                        + "Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
                    request.shardId().getIndex(),
                    request,
                    state.version(),
                    request.routedBasedOnClusterVersion()
                );
                retry(
                    new IndexNotFoundException(
                        "failed to find index as current cluster state with version ["
                            + state.version()
                            + "] is stale (expected at least ["
                            + request.routedBasedOnClusterVersion()
                            + "]",
                        request.shardId().getIndexName()
                    )
                );
                return;
            } else {
                finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
                return;
            }
        }

        if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
            finishAsFailed(new IndexClosedException(indexMetadata.getIndex()));
            return;
        }

        if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
            // if the wait for active shard count has not been set in the request,
            // resolve it from the index settings
            request.waitForActiveShards(indexMetadata.getWaitForActiveShards());
        }
        assert request.waitForActiveShards() != ActiveShardCount.DEFAULT
            : "request waitForActiveShards must be set in resolveRequest";
                //分片路由
        final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
        if (primary == null || primary.active() == false) {
            logger.trace(
                "primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
                    + "cluster state version [{}]",
                request.shardId(),
                actionName,
                request,
                state.version()
            );
            retryBecauseUnavailable(request.shardId(), "primary shard is not active");
            return;
        }
        if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
            logger.trace(
                "primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
                    + "cluster state version [{}]",
                request.shardId(),
                primary.currentNodeId(),
                actionName,
                request,
                state.version()
            );
            retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
            return;
        }
        final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
        //因为我本地只起了一个服务,那么就会走到local里.
        if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
            performLocalAction(state, primary, node, indexMetadata);
        } else {
            performRemoteAction(state, primary, node);
        }
    }
}
private void performLocalAction(ClusterState state, ShardRouting primary, DiscoveryNode node, IndexMetadata indexMetadata) {
    setPhase(task, "waiting_on_primary");
    if (logger.isTraceEnabled()) {
        logger.trace(
            "send action [{}] to local primary [{}] for request [{}] with cluster state version [{}] to [{}] ",
            transportPrimaryAction,
            request.shardId(),
            request,
            state.version(),
            primary.currentNodeId()
        );
    }
    //执行动作
    performAction(
        node,
        transportPrimaryAction,
        true,
        new ConcreteShardRequest<>(
            request,
            primary.allocationId().getId(),
            indexMetadata.primaryTerm(primary.id()),
            true,
            initiatedByNodeClient
        )
    );
}
private void performAction(
    final DiscoveryNode node,
    final String action,
    final boolean isPrimaryAction,
    final TransportRequest requestToPerform
) {
    //走到发送请求到服务器的方法了.
    transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {

        @Override
        public Response read(StreamInput in) throws IOException {
            return newResponseInstance(in);
        }

        @Override
        public void handleResponse(Response response) {
            finishOnSuccess(response);
        }

        @Override
        public void handleException(TransportException exp) {
            try {
                // if we got disconnected from the node, or the node / shard is not in the right state (being closed)
                final Throwable cause = exp.unwrapCause();
                if (cause instanceof ConnectTransportException
                    || cause instanceof NodeClosedException
                    || (isPrimaryAction && retryPrimaryException(cause))) {
                    logger.trace(
                        () -> new ParameterizedMessage(
                            "received an error from node [{}] for request [{}], scheduling a retry",
                            node.getId(),
                            requestToPerform
                        ),
                        exp
                    );
                    retry(exp);
                } else {
                    finishAsFailed(exp);
                }
            } catch (Exception e) {
                e.addSuppressed(exp);
                finishWithUnexpectedFailure(e);
            }
        }
    });

我们到transportService里看看

public final <T extends TransportResponse> void sendRequest(
    final DiscoveryNode node,
    final String action,
    final TransportRequest request,
    final TransportRequestOptions options,
    TransportResponseHandler<T> handler
) {
    final Transport.Connection connection;
    try {
        //获取连接.
        connection = getConnection(node);
    } catch (final NodeNotConnectedException ex) {
        // the caller might not handle this so we invoke the handler
        handler.handleException(ex);
        return;
    }
    //发送请求.
    sendRequest(connection, action, request, options, handler);
}
/**
 * Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
 *
 * @param connection the connection to send the request on
 * @param action     the name of the action
 * @param request    the request
 * @param options    the options for this request
 * @param handler    the response handler
 * @param <T>        the type of the transport response
 */
public final <T extends TransportResponse> void sendRequest(
    final Transport.Connection connection,
    final String action,
    final TransportRequest request,
    final TransportRequestOptions options,
    final TransportResponseHandler<T> handler
) {
    try {
        final TransportResponseHandler<T> delegate;
        if (request.getParentTask().isSet()) {
            // If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual
            // child task on the target node of the remote cluster.
            // ----> a parent task on the local cluster
            // |
            // ----> a proxy task on the proxy node on the remote cluster
            // |
            // ----> an actual child task on the target node on the remote cluster
            // To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target
            // node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we
            // unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
            final Transport.Connection unwrappedConn = unwrapConnection(connection);
            final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
            delegate = new TransportResponseHandler<T>() {
                @Override
                public void handleResponse(T response) {
                    unregisterChildNode.close();
                    handler.handleResponse(response);
                }

                @Override
                public void handleException(TransportException exp) {
                    unregisterChildNode.close();
                    handler.handleException(exp);
                }

                @Override
                public String executor() {
                    return handler.executor();
                }

                @Override
                public T read(StreamInput in) throws IOException {
                    return handler.read(in);
                }

                @Override
                public String toString() {
                    return getClass().getName() + "/[" + action + "]:" + handler.toString();
                }
            };
        } else {
            delegate = handler;
        }
        //异步发送.
        asyncSender.sendRequest(connection, action, request, options, delegate);
    } catch (final Exception ex) {
        // the caller might not handle this so we invoke the handler
        final TransportException te;
        if (ex instanceof TransportException) {
            te = (TransportException) ex;
        } else {
            te = new SendRequestTransportException(connection.getNode(), action, ex);
        }
        handler.handleException(te);
    }
}

然后到了异步发送阶段了.

 private final TransportInterceptor.AsyncSender asyncSender;

异步发送肯定是有一个线程池在处理这些请求.
然后到

 @Override
    public AsyncSender interceptSender(AsyncSender sender) {
        return new AsyncSender() {
            @Override
            public <T extends TransportResponse> void sendRequest(
                Transport.Connection connection,
                String action,
                TransportRequest request,
                TransportRequestOptions options,
                TransportResponseHandler<T> handler
            ) {
                final boolean requireAuth = shouldRequireExistingAuthentication();
                // the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
                // ourselves otherwise we wind up using a version newer than what we can actually send
                final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);

                // Sometimes a system action gets executed like a internal create index request or update mappings request
                // which means that the user is copied over to system actions so we need to change the user
                if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
                    securityContext.executeAsUser(
                        SystemUser.INSTANCE,
                        (original) -> sendWithUser(
                            connection,
                            action,
                            request,
                            options,
                            new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
                            sender,
                            requireAuth
                        ),
                        minVersion
                    );
                } else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
                    AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(
                        threadPool.getThreadContext(),
                        securityContext,
                        (original) -> sendWithUser(
                            connection,
                            action,
                            request,
                            options,
                            new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
                            sender,
                            requireAuth
                        )
                    );
                } else if (securityContext.getAuthentication() != null
                    && securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
                        // re-write the authentication since we want the authentication version to match the version of the connection
                        securityContext.executeAfterRewritingAuthentication(
                            original -> sendWithUser(
                                connection,
                                action,
                                request,
                                options,
                                new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler),
                                sender,
                                requireAuth
                            ),
                            minVersion
                        );
                    } else {
                        sendWithUser(connection, action, request, options, handler, sender, requireAuth);
                    }
            }
        };
    }

然后到securityContext.executeAsUser.

 public void executeAsUser(User user, Consumer<StoredContext> consumer, Version version) {
        final StoredContext original = threadContext.newStoredContext(true);
        try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
            setUser(user, version);
            consumer.accept(original);
        }
    }

然后到sendWithUser.

private <T extends TransportResponse> void sendWithUser(
    Transport.Connection connection,
    String action,
    TransportRequest request,
    TransportRequestOptions options,
    TransportResponseHandler<T> handler,
    AsyncSender sender,
    final boolean requireAuthentication
) {
    if (securityContext.getAuthentication() == null && requireAuthentication) {
        // we use an assertion here to ensure we catch this in our testing infrastructure, but leave the ISE for cases we do not catch
        // in tests and may be hit by a user
        assertNoAuthentication(action);
        throw new IllegalStateException("there should always be a user when sending a message for action [" + action + "]");
    }

    try {

        sender.sendRequest(connection, action, request, options, handler);
    } catch (Exception e) {
        handler.handleException(new SendRequestTransportException(connection.getNode(), action, e));
    }
}

这个sender是一个lambada表达式,在TransportService里.

this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);

然后走到sendRequestInternal里.

 private <T extends TransportResponse> void sendRequestInternal(
        final Transport.Connection connection,
        final String action,
        final TransportRequest request,
        final TransportRequestOptions options,
        TransportResponseHandler<T> handler
    ) {
        if (connection == null) {
            throw new IllegalStateException("can't send request to a null connection");
        }
        DiscoveryNode node = connection.getNode();

        Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
        ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
        // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
        final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
        final TimeoutHandler timeoutHandler;
        if (options.timeout() != null) {
            timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
            responseHandler.setTimeoutHandler(timeoutHandler);
        } else {
            timeoutHandler = null;
        }
        try {
            if (lifecycle.stoppedOrClosed()) {
                /*
                 * If we are not started the exception handling will remove the request holder again and calls the handler to notify the
                 * caller. It will only notify if toStop hasn't done the work yet.
                 */
                throw new NodeClosedException(localNode);
            }
            if (timeoutHandler != null) {
                assert options.timeout() != null;
                timeoutHandler.scheduleTimeout(options.timeout());
            }
            connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
        } catch (final Exception e) {
            // usually happen either because we failed to connect to the node
            // or because we failed serializing the message
            final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);
            // If holderToNotify == null then handler has already been taken care of.
            if (contextToNotify != null) {
                if (timeoutHandler != null) {
                    timeoutHandler.cancel();
                }
                // callback that an exception happened, but on a different thread since we don't
                // want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current
                // thread on a best effort basis though.
                final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
                final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
                threadPool.executor(executor).execute(new AbstractRunnable() {
                    @Override
                    public void onRejection(Exception e) {
                        // if we get rejected during node shutdown we don't wanna bubble it up
                        logger.debug(
                            () -> new ParameterizedMessage(
                                "failed to notify response handler on rejection, action: {}",
                                contextToNotify.action()
                            ),
                            e
                        );
                    }

                    @Override
                    public void onFailure(Exception e) {
                        logger.warn(
                            () -> new ParameterizedMessage(
                                "failed to notify response handler on exception, action: {}",
                                contextToNotify.action()
                            ),
                            e
                        );
                    }

                    @Override
                    protected void doRun() throws Exception {
                        contextToNotify.handler().handleException(sendRequestException);
                    }
                });
            } else {
                logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
            }
        }
    }

然后就到了有连接来发送请求了.

connection.sendRequest(requestId, action, request, options);
Transport.Connection = new Transport.Connection{
    ...
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
    throws TransportException {
    sendLocalRequest(requestId, action, request, options);
}
private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
      // 
    final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
    try {
        onRequestSent(localNode, requestId, action, request, options);
        onRequestReceived(requestId, action);
        @SuppressWarnings("unchecked")
        final RequestHandlerRegistry<TransportRequest> reg = (RequestHandlerRegistry<TransportRequest>) getRequestHandler(action);
        if (reg == null) {
            throw new ActionNotFoundTransportException("Action [" + action + "] not found");
        }
        final String executor = reg.getExecutor();
        if (ThreadPool.Names.SAME.equals(executor)) {
            reg.processMessageReceived(request, channel);
        } else {
            boolean success = false;
            request.incRef();
            try {
                threadPool.executor(executor).execute(new AbstractRunnable() {
                    //
                    @Override
                    protected void doRun() throws Exception {
                        reg.processMessageReceived(request, channel);
                    }

                    @Override
                    public boolean isForceExecution() {
                        return reg.isForceExecution();
                    }

                    @Override
                    public void onFailure(Exception e) {
                        try {
                            channel.sendResponse(e);
                        } catch (Exception inner) {
                            inner.addSuppressed(e);
                            logger.warn(
                                () -> new ParameterizedMessage("failed to notify channel of error message for action [{}]", action),
                                inner
                            );
                        }
                    }

                    @Override
                    public String toString() {
                        return "processing of [" + requestId + "][" + action + "]: " + request;
                    }

                    @Override
                    public void onAfter() {
                        request.decRef();
                    }
                });
                success = true;
            } finally {
                if (success == false) {
                    request.decRef();
                }
            }
        }

    } catch (Exception e) {
        try {
            channel.sendResponse(e);
        } catch (Exception inner) {
            inner.addSuppressed(e);
            logger.warn(() -> new ParameterizedMessage("failed to notify channel of error message for action [{}]", action), inner);
        }
    }
}

doRun就进入到核心的get流程了,最终会走到InternalEngine里,这个类到es的核心引擎层了.

       @Override
                protected void doRun() throws Exception {
                    handler.messageReceived(request, channel, task);
                }
 @Override
        public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
            if (logger.isTraceEnabled()) {
                logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
            }
           //
            asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));
        }
   @Override
    protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());
        if (request.realtime()) { // we are not tied to a refresh cycle here anyway
            super.asyncShardOperation(request, shardId, listener);
        } else {
            indexShard.awaitShardSearchActive(b -> {
                try {
                    super.asyncShardOperation(request, shardId, listener);
                } catch (Exception ex) {
                    listener.onFailure(ex);
                }
            });
        }
    }

有线程池开始处理了.

 protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
        threadPool.executor(getExecutor(request, shardId)).execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId)));
    }
@Override
    protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());

        if (request.refresh() && request.realtime() == false) {
            indexShard.refresh("refresh_flag_get");
        }
      
        GetResult result = indexShard.getService()
            .get(
                request.type(),
                request.id(),
                request.storedFields(),
                request.realtime(),
                request.version(),
                request.versionType(),
                request.fetchSourceContext()
            );
        return new GetResponse(result);
    }
public GetResult get(
        String type,
        String id,
        String[] gFields,
        boolean realtime,
        long version,
        VersionType versionType,
        FetchSourceContext fetchSourceContext
    ) {
        return get(type, id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext);
    }

ShardGetService

 private GetResult innerGet(
        String type,
        String id,
        String[] gFields,
        boolean realtime,
        long version,
        VersionType versionType,
        long ifSeqNo,
        long ifPrimaryTerm,
        FetchSourceContext fetchSourceContext
    ) {
        fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
        if (type == null || type.equals("_all")) {
            MappingLookup mappingLookup = mapperService.mappingLookup();
            type = mappingLookup.hasMappings() ? mappingLookup.getType() : null;
        }

        Engine.GetResult get = null;
        if (type != null) {
            Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
            get = indexShard.get(
                new Engine.Get(realtime, realtime, type, id, uidTerm).version(version)
                    .versionType(versionType)
                    .setIfSeqNo(ifSeqNo)
                    .setIfPrimaryTerm(ifPrimaryTerm)
            );
            if (get.exists() == false) {
                get.close();
            }
        }

        if (get == null || get.exists() == false) {
            return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
        }

        try {
            // break between having loaded it from translog (so we only have _source), and having a document to load
            return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get);
        } finally {
            get.close();
        }
    }

IndexShard

  public Engine.GetResult get(Engine.Get get) {
        readAllowed();
        MappingLookup mappingLookup = mapperService.mappingLookup();
        if (mappingLookup.hasMappings() == false
            || mappingLookup.getType().equals(mapperService.resolveDocumentType(get.type())) == false) {
            return GetResult.NOT_EXISTS;
        }
        //获取引擎然后get
        return getEngine().get(get, mappingLookup, mapperService.documentParser(), this::wrapSearcher);
    }

成了,这个方法后面再看,是从哪里取到索引的,也就能看到es的索引存储结构了.

  @Override
    public GetResult get(
        Get get,
        MappingLookup mappingLookup,
        DocumentParser documentParser,
        Function<Engine.Searcher, Engine.Searcher> searcherWrapper
    ) {
        assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
        System.out.println("get"+get.toString());
        try (ReleasableLock ignored = readLock.acquire()) {
            ensureOpen();
            if (get.realtime()) {
                final VersionValue versionValue;
                try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
                    // we need to lock here to access the version map to do this truly in RT
                    versionValue = getVersionFromMap(get.uid().bytes());
                }
                if (versionValue != null) {
                    if (versionValue.isDelete()) {
                        return GetResult.NOT_EXISTS;
                    }
                    if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
                        throw new VersionConflictEngineException(
                            shardId,
                            get.id(),
                            get.versionType().explainConflictForReads(versionValue.version, get.version())
                        );
                    }
                    if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
                        && (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) {
                        throw new VersionConflictEngineException(
                            shardId,
                            get.id(),
                            get.getIfSeqNo(),
                            get.getIfPrimaryTerm(),
                            versionValue.seqNo,
                            versionValue.term
                        );
                    }
                    if (get.isReadFromTranslog()) {
                        // this is only used for updates - API _GET calls will always read form a reader for consistency
                        // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
                        if (versionValue.getLocation() != null) {
                            try {
                                final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
                                if (operation != null) {
                                    return getFromTranslog(get, (Translog.Index) operation, mappingLookup, documentParser, searcherWrapper);
                                }
                            } catch (IOException e) {
                                maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
                                throw new EngineException(shardId, "failed to read operation from translog", e);
                            }
                        } else {
                            trackTranslogLocation.set(true);
                        }
                    }
                    assert versionValue.seqNo >= 0 : versionValue;
                    refreshIfNeeded("realtime_get", versionValue.seqNo);
                }
                return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
            } else {
                // we expose what has been externally expose in a point in time snapshot via an explicit refresh
                return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false);
            }
        }
    }

get到结果之后,就要返回了.上面的innerGet会封装成一个GetResult,然后在shardOperation里会把getResult封装成GetResponse.
有了getResponse后,在(ActionRunnable.supply(listener, () -> shardOperation(request, shardId))里,监听者就要发挥作用了.

 public static <T> ActionRunnable<T> supply(ActionListener<T> listener, CheckedSupplier<T, Exception> supplier) {
        return ActionRunnable.wrap(listener, l -> l.onResponse(supplier.get()));
    }
    @Override
    public void onResponse(Response response) {
        try {
            //TaskTransportChannel
            channel.sendResponse(response);
        } catch (Exception e) {
            onFailure(e);
        }
    }
  @Override
    public void sendResponse(TransportResponse response) throws IOException {
        try {
            onTaskFinished.close();
        } finally {
            //DirectResponseChannel.
            channel.sendResponse(response);
        }
    }

基本上到这里就结束了,简单的流程就是先封装好一个请求,然后由线程池处理请求,线程池处理完请求发给对应的channel.基本上就是client,server的典型处理流程.然后总结下调用链中每个涉及到的类吧,多思考思考这些大牛是如果做类的设计,以及为什么要这么设计.


transport_worker线程组
elasticsearch线程组

流程虽然绕,但是这个总体的过程就是个client,server的过程,基础很重要,所以一定要把netty搞透彻了.es的代码是真的顶!下一篇会把这里面的类的设计,以及一些很惊艳的代码拉出来鉴赏一下.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容