源码阅读笔记:分布式服务框架XXL-RPC(基于1.4.1)todo

前言:接上篇,看完了注册中心,该看看RPC框架了——《分布式服务框架XXL-RPC》

老样子,想看看它自己怎么吹的

1.1 概述>

XXL-RPC 是一个分布式服务框架,提供稳定高性能的RPC远程服务调用功能。拥有"高性能、分布式、注册中心、负载均衡、服务治理"等特性。现已开放源代码,开箱即用。>

1.2 特性>

  • 1、快速接入:接入步骤非常简洁,两分钟即可上手;
  • 2、服务透明:系统完整的封装了底层通信细节,开发时调用远程服务就像调用本地服务,在提供远程调用能力时不损失本地调用的语义简洁性;
  • 3、多调用方案:支持 SYNC、ONEWAY、FUTURE、CALLBACK 等方案;
  • 4、多通讯方案:支持 TCP 和 HTTP 两种通讯方式进行服务调用;其中 TCP 提供可选方案 NETTY 或 MINA ,HTTP 提供可选方案 NETTY_HTTP 或 Jetty;
  • 5、多序列化方案:支持 HESSIAN、HESSIAN1、PROTOSTUFF、KRYO、JACKSON 等方案;
  • 6、负载均衡/软负载:提供丰富的负载均衡策略,包括:轮询、随机、LRU、LFU、一致性HASH等;
  • 7、注册中心:可选组件,支持服务注册并动态发现;可选择不启用,直接指定服务提供方机器地址通讯;选择启用时,内置可选方案:“XXL-REGISTRY 轻量级注册中心”(推荐)、“ZK注册中心”、“Local注册中心”等;
  • 8、服务治理:提供服务治理中心,可在线管理注册的服务信息,如服务锁定、禁用等;
  • 9、服务监控:可在线监控服务调用统计信息以及服务健康状况等(计划中);
  • 10、容错:服务提供方集群注册时,某个服务节点不可用时将会自动摘除,同时消费方将会移除失效节点将流量分发到其余节点,提高系统容错能力。
  • 11、解决1+1问题:传统分布式通讯一般通过nginx或f5做集群服务的流量负载均衡,每次请求在到达目标服务机器之前都需要经过负载均衡机器,即1+1,这将会把流量放大一倍。而XXL-RPC将会从消费方直达服务提供方,每次请求直达目标机器,从而可以避免上述问题;
  • 12、高兼容性:得益于优良的兼容性与模块化设计,不限制外部框架;除 spring/springboot 环境之外,理论上支持运行在任何Java代码中,甚至main方法直接启动运行;
  • 13、泛化调用:服务调用方不依赖服务方提供的API;

还是老套路,直接代码下下来,跑个demo看看(ps:注册中心它自己推荐的XXL-REGISTRY)
先看看provider的代码。provider提供了一个简单的sayHi方法,代码如下。

@XxlRpcService
@Service
public class DemoServiceImpl implements DemoService {
    private static Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);

    @Override
    public UserDTO sayHi(String name) {

        String word = MessageFormat.format("Hi {0}, from {1} as {2}",
                name, DemoServiceImpl.class.getName(), String.valueOf(System.currentTimeMillis()));

        if ("error".equalsIgnoreCase(name)) throw new RuntimeException("test exception.");

        UserDTO userDTO = new UserDTO(name, word);
        logger.info(userDTO.toString());

        return userDTO;
    }

}

结着按照说明稍微配置一下注册中心,provider走你!
成功启动后可以在注册中心看到刚刚启动的provider


注册中心

点开编辑看看


编辑

很好,看起来非常美好,注册key就是接口,注册信息就是provider的url,看起来非常ok。
那么我们再把consumer跑起来看看,配置同一个注册中心,走你!
控制台表示成功运行起来了。那么我们看看怎么测试一下远程调用。
翻一翻consumer代码,看到一个controller,然后在controller里面会调用远程方法。

@Controller
public class IndexController {
    
    @XxlRpcReference
    private DemoService demoService;


    @RequestMapping("")
    @ResponseBody
    public UserDTO http(String name) {

        try {
            return demoService.sayHi(name);
        } catch (Exception e) {
            e.printStackTrace();
            return new UserDTO(null, e.getMessage());
        }
    }

}

ok,让我们打开浏览器试一试。和预期的一样,得到了provider的response


浏览器直接调用

这样一个最简单的远程调用就完成了。
那么接下来,就该看看这些功能是怎么实现的了。先帖个框架自己对rpc的描述

4.4 RPC工作原理剖析

rpc原理

概念
1、serialization:序列化,通讯数据需要经过序列化,从而支持在网络中传输;
2、deserialization:反序列化,服务接受到序列化的请求数据,需要序列化为底层原始数据;
3、stub:体现在XXL-RPC为服务的api接口;
4、skeleton:体现在XXL-RPC为服务的实现api接口的具体服务;
5、proxy:根据远程服务的stub生成的代理服务,对开发人员透明;
6、provider:远程服务的提供方;
7、consumer:远程服务的消费方;

RPC通讯,可大致划分为四个步骤,可参考上图进行理解:(XXL-RPC提供了多种调用方案,此处以 “SYNC” 方案为例讲解;)>
1、consumer发起请求:consumer会根据远程服务的stub实例化远程服务的代理服务,在发起请求时,代理服务会封装本次请求相关底层数据,如服务iface、methos、params等等,然后将数据经过serialization之后发送给provider;
2、provider接收请求:provider接收到请求数据,首先会deserialization获取原始请求数据,然后根据stub匹配目标服务并调用;
3、provider响应请求:provider在调用目标服务后,封装服务返回数据并进行serialization,然后把数据传输给consumer;
4、consumer接收响应:consumer接受到相应数据后,首先会deserialization获取原始数据,然后根据stub生成调用返回结果,返回给请求调用处。结束。

其实已经讲得比较清楚,在官方提供的demo里,

  1. consumer调用sayHi方法
  2. 通过注册中心找到provider
  3. 代理类封装请求并序列化后发送给provider
  4. provider反序列化数据,发现调用的是sayHi方法
  5. 把调用结果序列化返回给consumer
  6. consumer反序列化返回结果

接下来回到代码本身,看看这一系列过程是怎么实现的。
先从provider的demo入手吧。
先看看配置,只配置了一个XxlRpcSpringProviderFactorybean
从配置代码来看,配置了provider的端口,注册中心的类型(xxl-registry or zookeeper or local,这里是xxl-registry) ,已经注册中心的一些参数(这里是对应注册中心xxl-registry需要的配置:注册中心地址,环境,token)

@Configuration
public class XxlRpcProviderConfig {
    private Logger logger = LoggerFactory.getLogger(XxlRpcProviderConfig.class);

    @Value("${xxl-rpc.remoting.port}")
    private int port;

    @Value("${xxl-rpc.registry.xxlregistry.address}")
    private String address;

    @Value("${xxl-rpc.registry.xxlregistry.env}")
    private String env;

    @Value("${xxl-rpc.registry.xxlregistry.token}")
    private String token;

    @Bean
    public XxlRpcSpringProviderFactory xxlRpcSpringProviderFactory() {

        XxlRpcSpringProviderFactory providerFactory = new XxlRpcSpringProviderFactory();
        providerFactory.setPort(port);
        providerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
        providerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
            put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
            put(XxlRegistryServiceRegistry.ENV, env);
            put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
        }});

        logger.info(">>>>>>>>>>> xxl-rpc provider config init finish.");
        return providerFactory;
    }

}

ok,那我们在看看XxlRpcSpringProviderFactory有什么花头。
实现了3个接口
implements ApplicationContextAware, InitializingBean,DisposableBean
,并继承自XxlRpcProviderFactory

ps:这几个接口都是一些spring bean的一些扩展,详细的可以自行搜索, 下面给出一些简单的描述

InitialingBean是一个接口,提供了一个唯一的方法afterPropertiesSet()
DisposableBean也是一个接口,提供了一个唯一的方法destory()
前者顾名思义在Bean属性都设置完毕后调用afterPropertiesSet()方法做一些初始化的工作,后者在Bean生命周期结束前调用destory()方法做一些收尾工作

实现ApplicationContextAware接口的Bean,在Bean加载的过程中可以获取到Spring的ApplicationContext,这个尤其重要,ApplicationContext是Spring应用上下文,从ApplicationContext中可以获取包括任意的Bean在内的大量Spring容器内容和信息

public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {

这里比较好理解,因为XxlRpcSpringProviderFactory是针对spring的客户端,所需需要额外实现几个接口,主要的逻辑是在它的父类XxlRpcProviderFactory里面。
先看看第一段

// ---------------------- config ----------------------

里面的内容
netType定义了网络通信协议(NETTY,NETTY_HTTP,MINA,JETTY)
Serializer定义了序列化方式
ip,port,accessToken还不知道干嘛,留着
serviceRegistryClassserviceRegistryParam定义注册中心类和参数

    private NetEnum netType;
    private Serializer serializer;

    private String ip;                  // for registry
    private int port;                   // default port
    private String accessToken;

    private Class<? extends ServiceRegistry> serviceRegistryClass;
    private Map<String, String> serviceRegistryParam;

再往下看,这些属性的设置全是在initConfig方法中设值的。
通过这段代码,就可以知道,ip其实是给consumer使用的本地ip(会注册到注册中心的ip),port其实就是用来和consumer通信的端口号(比如刚刚demo里面的7080)

public void initConfig(NetEnum netType,
                          Serializer serializer,
                          String ip,
                          int port,
                          String accessToken,
                           Class<? extends ServiceRegistry> serviceRegistryClass,
                          Map<String, String> serviceRegistryParam) {

        // init
        this.netType = netType;
        this.serializer = serializer;
        this.ip = ip;
        this.port = port;
        this.accessToken = accessToken;
        this.serviceRegistryClass = serviceRegistryClass;
        this.serviceRegistryParam = serviceRegistryParam;

        // valid
        if (this.netType==null) {
            throw new XxlRpcException("xxl-rpc provider netType missing.");
        }
        if (this.serializer==null) {
            throw new XxlRpcException("xxl-rpc provider serializer missing.");
        }
        if (this.ip == null) {
            this.ip = IpUtil.getIp();
        }
        if (this.port <= 0) {
            this.port = 7080;
        }
        if (NetUtil.isPortUsed(this.port)) {
            throw new XxlRpcException("xxl-rpc provider port["+ this.port +"] is used.");
        }
        if (this.serviceRegistryClass != null) {
            if (this.serviceRegistryParam == null) {
                throw new XxlRpcException("xxl-rpc provider serviceRegistryParam is missing.");
            }
        }

    }

这段代码其实就是一些基础参数的config。比如用什么通信协议,开房什么端口,用什么注册中心等等。
再接着往下看

// ---------------------- start / stop ----------------------

看看start和stop代码有什么

    private Server server;
    private ServiceRegistry serviceRegistry;
    private String serviceAddress;

先往下看,定义了start和stop方法,仔细一看是针对server的startstop,server是netType的一个instance。
那么就比较好理解了,server就是负责通信的实例(demo里是netty)。
首先拿到server的实例,然后设置了setStartedCallbacksetStopedCallback,并调用了start方法。

public void start() throws Exception {
        // start server
        serviceAddress = IpUtil.getIpPort(this.ip, port);
        server = netType.serverClass.newInstance();
        server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistryClass != null) {
                    serviceRegistry = serviceRegistryClass.newInstance();
                    serviceRegistry.start(serviceRegistryParam);
                    if (serviceData.size() > 0) {
                        serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                    }
                }
            }
        });
        server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
            @Override
            public void run() {
                // stop registry
                if (serviceRegistry != null) {
                    if (serviceData.size() > 0) {
                        serviceRegistry.remove(serviceData.keySet(), serviceAddress);
                    }
                    serviceRegistry.stop();
                    serviceRegistry = null;
                }
            }
        });
        server.start(this);
    }

    public void  stop() throws Exception {
        // stop server
        server.stop();
    }

那我们再深入看看server这些callbackstart方法都干了什么吧。
server是一个抽象类(nettyServer会继承这个类),定义了BaseCallback类型的startedCallbackstopedCallback,根据名字猜测是通信server调用start后,会调用startedCallback.run方法,server调用stop之后会调用stopedCallback.run方法。好像比较抽象,毕竟是抽象类。

public abstract class Server {
    protected static final Logger logger = LoggerFactory.getLogger(Server.class);


    private BaseCallback startedCallback;
    private BaseCallback stopedCallback;

    public void setStartedCallback(BaseCallback startedCallback) {
        this.startedCallback = startedCallback;
    }

    public void setStopedCallback(BaseCallback stopedCallback) {
        this.stopedCallback = stopedCallback;
    }


    /**
     * start server
     *
     * @param xxlRpcProviderFactory
     * @throws Exception
     */
    public abstract void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception;

    /**
     * callback when started
     */
    public void onStarted() {
        if (startedCallback != null) {
            try {
                startedCallback.run();
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-rpc, server startedCallback error.", e);
            }
        }
    }

    /**
     * stop server
     *
     * @throws Exception
     */
    public abstract void stop() throws Exception;

    /**
     * callback when stoped
     */
    public void onStoped() {
        if (stopedCallback != null) {
            try {
                stopedCallback.run();
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-rpc, server stopedCallback error.", e);
            }
        }
    }

}

那我们直接进入继承他的NettyServer看看,这样应该就比较清晰了。
start方法里面直接开了一个守护线程,线程做的事情非常简单:配置并开启netty服务,并调用onStarted方法
stop方法更简单,直接interrupt,并调用onStoped方法。

public class NettyServer extends Server {

    private Thread thread;

    @Override
    public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {

        thread = new Thread(new Runnable() {
            @Override
            public void run() {

                // param
                final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(NettyServer.class.getSimpleName());
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();

                try {
                    // start server
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            .addLast(new IdleStateHandler(0,0,10, TimeUnit.MINUTES))
                                            .addLast(new NettyDecoder(XxlRpcRequest.class, xxlRpcProviderFactory.getSerializer()))
                                            .addLast(new NettyEncoder(XxlRpcResponse.class, xxlRpcProviderFactory.getSerializer()))
                                            .addLast(new NettyServerHandler(xxlRpcProviderFactory, serverHandlerPool));
                                }
                            })
                            .childOption(ChannelOption.TCP_NODELAY, true)
                            .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // bind
                    ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();

                    logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyServer.class.getName(), xxlRpcProviderFactory.getPort());
                    onStarted();

                    // wait util stop
                    future.channel().closeFuture().sync();

                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        logger.info(">>>>>>>>>>> xxl-rpc remoting server stop.");
                    } else {
                        logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e);
                    }
                } finally {

                    // stop
                    try {
                        serverHandlerPool.shutdown();    // shutdownNow
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                    try {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }

                }
            }
        });
        thread.setDaemon(true);
        thread.start();

    }

    @Override
    public void stop() throws Exception {

        // destroy server thread
        if (thread != null && thread.isAlive()) {
            thread.interrupt();
        }

        // on stop
        onStoped();
        logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
    }

}

让我们再回到刚才的代码,startedCallback方法就是在netty服务启动完成之后,把provider的信息注册到注册中心

ServiceRegistry是个注册中心的抽象,demo里面用的是XxlRegistryServiceRegistry,其实就是对注册中心操作的一些封装,代码非常简单,不懂可以参考上一篇源码阅读:分布式服务注册中心XXL-REGISTRY(基于1.0.2)

server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistryClass != null) {
                    serviceRegistry = serviceRegistryClass.newInstance();
                    serviceRegistry.start(serviceRegistryParam);
                    if (serviceData.size() > 0) {
                        serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                    }
                }
            }
        });

stopedCallback也比较好理解了,就是在netty服务关闭之后,从注册中心移除自己。

server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
            @Override
            public void run() {
                // stop registry
                if (serviceRegistry != null) {
                    if (serviceData.size() > 0) {
                        serviceRegistry.remove(serviceData.keySet(), serviceAddress);
                    }
                    serviceRegistry.stop();
                    serviceRegistry = null;
                }
            }
        });

最后再看看server invoke里面有什么吧
看起来像是用来记rpc service的,先不管他,接着往下看

// ---------------------- server invoke ----------------------
/**
     * init local rpc service map
     */
    private Map<String, Object> serviceData = new HashMap<String, Object>();
    public Map<String, Object> getServiceData() {
        return serviceData;
    }

好像就是字符串的拼接,不知道干嘛的,先往下看

    /**
     * make service key
     *
     * @param iface
     * @param version
     * @return
     */
    public static String makeServiceKey(String iface, String version){
        String serviceKey = iface;
        if (version!=null && version.trim().length()>0) {
            serviceKey += "#".concat(version);
        }
        return serviceKey;
    }

addService用到了makeServiceKey,看来这个是用来做唯一主键的。
根据名字推测,应该是往前面的serviceData里面把serviceBean放进去。

/**
     * add service
     *
     * @param iface
     * @param version
     * @param serviceBean
     */
    public void addService(String iface, String version, Object serviceBean){
        String serviceKey = makeServiceKey(iface, version);
        serviceData.put(serviceKey, serviceBean);

        logger.info(">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}", serviceKey, serviceBean.getClass());
    }

通过IDE看看哪里用了addService方法
发现在刚才的XxlRpcSpringProviderFactory就有用到!
看下代码,其实很简单:

  • 从spring上下文找到加了XxlRpcService注解的bean
  • 接口名+版本号作为唯一主键把bean放入serviceData里面
@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
        if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                // valid
                if (serviceBean.getClass().getInterfaces().length ==0) {
                    throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                }
                // add service
                XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);

                String iface = serviceBean.getClass().getInterfaces()[0].getName();
                String version = xxlRpcService.version();

                super.addService(iface, version, serviceBean);
            }
        }

        // TODO,addServices by api + prop

    }

最后一个方法,从方法名就能看出来,调用service,接受一个xxlRpcRequest参数
serviceData里面取出request里面要调用的bean
通过反射调用方法并返回response

/**
     * invoke service
     *
     * @param xxlRpcRequest
     * @return
     */
    public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {

        //  make response
        XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
        xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());

        // match service bean
        String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
        Object serviceBean = serviceData.get(serviceKey);

        // valid
        if (serviceBean == null) {
            xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
            return xxlRpcResponse;
        }

        if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
            xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
            return xxlRpcResponse;
        }
        if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
            xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
            return xxlRpcResponse;
        }

        try {
            // invoke
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            Object result = method.invoke(serviceBean, parameters);

            /*FastClass serviceFastClass = FastClass.create(serviceClass);
            FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
            Object result = serviceFastMethod.invoke(serviceBean, parameters);*/

            xxlRpcResponse.setResult(result);
        } catch (Throwable t) {
            // catch error
            logger.error("xxl-rpc provider invokeService error.", t);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
        }

        return xxlRpcResponse;
    }

ok,到这里为止,XxlRpcProviderFactory的代码看完了,来总结一下它究竟能干什么事情

  • 配置通信协议,序列化方式,注册中心
  • 开启通信server
  • serviceData里所有的provider服务注册到注册中心
  • 通过反射机制,提供调用服务的(invokeService)方法
    看完了XxlRpcProviderFactory,我们再回到XxlRpcSpringProviderFactory
    与父类不同的是,提供了netType默认使用netty,序列化默认使用hessian
    // ---------------------- config ----------------------

    private String netType = NetEnum.NETTY.name();
    private String serialize = Serializer.SerializeEnum.HESSIAN.name();

再看看必须实现的几个接口
这个方法前面已经看到过,这里是把所有带有XxlRpcService注解的bean放到serverData这个map里面

@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
        if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                // valid
                if (serviceBean.getClass().getInterfaces().length ==0) {
                    throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                }
                // add service
                XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);

                String iface = serviceBean.getClass().getInterfaces()[0].getName();
                String version = xxlRpcService.version();

                super.addService(iface, version, serviceBean);
            }
        }

        // TODO,addServices by api + prop

    }

最后两个方法,很简单,就是配置一下基本参数,以及调用父类的方法。

    @Override
    public void afterPropertiesSet() throws Exception {
        this.prepareConfig();
        super.start();
    }

    @Override
    public void destroy() throws Exception {
        super.stop();
    }

这样一来,XxlRpcSpringProviderFactory就全部阅读完了。我们重新梳理一遍流程XxlRpcInvokerConfig干的事情

  1. 配置通信框架(netty),序列化框架(hessian),注册中心(xxl-registry)
  2. 把使用了XxlRpcService注解的bean全部put到Map<String,Object> serviceData里面,key为bean继承的接口名+版本号,Object为service的bean本身
  3. 启动通信框架(netty),启动成功后把serviceData里面的bean注册到注册中心
    这样,provider就已经完全启动完成了,一切准备就绪,就等客户端调用了!
    那么,我们再看看客户端的代码!
    老套路,从配置开始看起,和provider的配置差不多,就不在赘述
@Configuration
public class XxlRpcInvokerConfig {
    private Logger logger = LoggerFactory.getLogger(XxlRpcInvokerConfig.class);


    @Value("${xxl-rpc.registry.xxlregistry.address}")
    private String address;

    @Value("${xxl-rpc.registry.xxlregistry.env}")
    private String env;

    @Value("${xxl-rpc.registry.xxlregistry.token}")
    private String token;


    @Bean
    public XxlRpcSpringInvokerFactory xxlJobExecutor() {

        XxlRpcSpringInvokerFactory invokerFactory = new XxlRpcSpringInvokerFactory();
        invokerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
        invokerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
            put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
            put(XxlRegistryServiceRegistry.ENV, env);
            put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
        }});

        logger.info(">>>>>>>>>>> xxl-rpc invoker config init finish.");
        return invokerFactory;
    }

}

直接去XxlRpcSpringInvokerFactory里面看看吧,InitializingBean,DisposableBean不再赘述

实现BeanFactoryAware接口的Bean,在Bean加载的过程中可以获取到加载该Bean的BeanFactory

InstantiationAwareBeanPostProcessor作用的是Bean实例化前后,即:
1、Bean构造出来之前调用postProcessBeforeInstantiation()方法
2、Bean构造出来之后调用postProcessAfterInstantiation()方法

public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean,DisposableBean, BeanFactoryAware {

ok,看看具体类里面的代码,先看第一段config相关
这段代码似曾相识,在ProviderFactory里面也有:注册中心配置

// ---------------------- config ----------------------

    private Class<? extends ServiceRegistry> serviceRegistryClass;          // class.forname
    private Map<String, String> serviceRegistryParam;


    public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
        this.serviceRegistryClass = serviceRegistryClass;
    }

    public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
        this.serviceRegistryParam = serviceRegistryParam;
    }

接着往下看,定义了一个 XxlRpcInvokerFactory

// ---------------------- util ----------------------

    private XxlRpcInvokerFactory xxlRpcInvokerFactory;

进到XxlRpcInvokerFactory 里面看看吧
首先很明显,这是一个单例模式
然后也配置了注册中心

public class XxlRpcInvokerFactory {
    private static Logger logger = LoggerFactory.getLogger(XxlRpcInvokerFactory.class);

    // ---------------------- default instance ----------------------

    private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(LocalServiceRegistry.class, null);
    public static XxlRpcInvokerFactory getInstance() {
        return instance;
    }


    // ---------------------- config ----------------------

    private Class<? extends ServiceRegistry> serviceRegistryClass;          // class.forname
    private Map<String, String> serviceRegistryParam;


    public XxlRpcInvokerFactory() {
    }
    public XxlRpcInvokerFactory(Class<? extends ServiceRegistry> serviceRegistryClass, Map<String, String> serviceRegistryParam) {
        this.serviceRegistryClass = serviceRegistryClass;
        this.serviceRegistryParam = serviceRegistryParam;
    }
    // 略
}

再往下看,似曾相识的代码
start方法是开始想注册中心注册
stop方法先从注册中心移除,然后在吧stopCallbackList里面还没执行的方法执行了
那什么时候调用addStopCallBackstopCallBack加进list呢?用IDE搜一搜,
发现在JettyClientConnectClient的时候用到了,好像暂时和我们demo的代码没什么关系,先放一放
最后把responseCallbackThreadPool 线程池shutDown了。

// ---------------------- start / stop ----------------------

    public void start() throws Exception {
        // start registry
        if (serviceRegistryClass != null) {
            serviceRegistry = serviceRegistryClass.newInstance();
            serviceRegistry.start(serviceRegistryParam);
        }
    }

    public void  stop() throws Exception {
        // stop registry
        if (serviceRegistry != null) {
            serviceRegistry.stop();
        }

        // stop callback
        if (stopCallbackList.size() > 0) {
            for (BaseCallback callback: stopCallbackList) {
                try {
                    callback.run();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

        // stop CallbackThreadPool
        stopCallbackThreadPool();
    }

// ---------------------- service registry ----------------------

    private ServiceRegistry serviceRegistry;
    public ServiceRegistry getServiceRegistry() {
        return serviceRegistry;
    }


    // ---------------------- service registry ----------------------

    private List<BaseCallback> stopCallbackList = new ArrayList<BaseCallback>();

    public void addStopCallBack(BaseCallback callback){
        stopCallbackList.add(callback);
    }


responseCallbackThreadPool线程池是用来干什么的?用IDE搜一搜,就在stopCallbackThreadPool上面
executeResponseCallback接受一个Runnable对象,并初始化线程池,并放入线程池
那么executeResponseCallback什么时候会被用到?

// ---------------------- response callback ThreadPool ----------------------

    private ThreadPoolExecutor responseCallbackThreadPool = null;
    public void executeResponseCallback(Runnable runnable){

        if (responseCallbackThreadPool == null) {
            synchronized (this) {
                if (responseCallbackThreadPool == null) {
                    responseCallbackThreadPool = new ThreadPoolExecutor(
                            10,
                            100,
                            60L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingQueue<Runnable>(1000),
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    return new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode());
                                }
                            },
                            new RejectedExecutionHandler() {
                                @Override
                                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                    throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!");
                                }
                            });     // default maxThreads 300, minThreads 60
                }
            }
        }
        responseCallbackThreadPool.execute(runnable);
    }
    public void stopCallbackThreadPool() {
        if (responseCallbackThreadPool != null) {
            responseCallbackThreadPool.shutdown();
        }
    }

其实就在上面
定义了一个concurrentMap,参数是一个string和一个XxlRpcFutureResponse
根据future这个名字,可以猜一下应该是个future(多线程)操作相关的
set和remove方法看起来就是对map进行一些关于request的操作
notifyInvokerFuture方法,从futureResponsePool根据requestId取出一个XxlRpcFutureResponse对象
然后判断Response的状态,并做一些设置,然后从futureResponsePool移出这个requestId
到这里,还比较蒙逼,大概只能看出,利用了future,对response做一些处理。
那么问题来了,response是什么时候生产的?为什么会有一堆callback方法?这样做的目的是什么?
因为没有看到调用远程服务的代码,看不懂很正常!

 // ---------------------- future-response pool ----------------------
    // XxlRpcFutureResponseFactory

    private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();
    public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){
        futureResponsePool.put(requestId, futureResponse);
    }
    public void removeInvokerFuture(String requestId){
        futureResponsePool.remove(requestId);
    }
    public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){

        // get
        final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
        if (futureResponse == null) {
            return;
        }

        // notify
        if (futureResponse.getInvokeCallback()!=null) {

            // callback type
            try {
                executeResponseCallback(new Runnable() {
                    @Override
                    public void run() {
                        if (xxlRpcResponse.getErrorMsg() != null) {
                            futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                        } else {
                            futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                        }
                    }
                });
            }catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        } else {

            // other nomal type
            futureResponse.setResponse(xxlRpcResponse);
        }

        // do remove
        futureResponsePool.remove(requestId);

    }

让我们回到XxlRpcSpringInvokerFactory,还剩最后一个方法postProcessAfterInstantiation
看看都干了些什么吧
取出加了XxlRpcReference注解的字段(field)
组装成XxlRpcReferenceBean,并根据名字猜测,通过这个bean得到一个service的代理对象!

@Override
    public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {

        // collection
        final Set<String> serviceKeyList = new HashSet<>();

        // parse XxlRpcReferenceBean
        ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
            @Override
            public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                if (field.isAnnotationPresent(XxlRpcReference.class)) {
                    // valid
                    Class iface = field.getType();
                    if (!iface.isInterface()) {
                        throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");
                    }

                    XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);

                    // init reference bean
                    XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(
                            rpcReference.netType(),
                            rpcReference.serializer().getSerializer(),
                            rpcReference.callType(),
                            rpcReference.loadBalance(),
                            iface,
                            rpcReference.version(),
                            rpcReference.timeout(),
                            rpcReference.address(),
                            rpcReference.accessToken(),
                            null,
                            xxlRpcInvokerFactory
                    );

                    Object serviceProxy = referenceBean.getObject();

                    // set bean
                    field.setAccessible(true);
                    field.set(bean, serviceProxy);

                    logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",
                            XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());

                    // collection
                    String serviceKey = XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version());
                    serviceKeyList.add(serviceKey);

                }
            }
        });

        // mult discovery
        if (xxlRpcInvokerFactory.getServiceRegistry() != null) {
            try {
                xxlRpcInvokerFactory.getServiceRegistry().discovery(serviceKeyList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }

        return super.postProcessAfterInstantiation(bean, beanName);
    }

看看getObject做了什么吧

  • 配置一个动态代理(猜测就是调用远程服务用的)
  • 根据XxlRpcInvokerFactory配置的注册中心,查找provider地址
  • 通过通信框架,把数据发送到provider机器
  • NettyClientHandler获得响应的时候, 会调用futureResponse.setResponse(xxlRpcResponse);把拿到的response放进futureResponse里面
  • 再通过XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);拿到response(同步调用)
// ---------------------- util ----------------------

    public Object getObject() {
        return Proxy.newProxyInstance(Thread.currentThread()
                .getContextClassLoader(), new Class[] { iface },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                        // method param
                        String className = method.getDeclaringClass().getName();    // iface.getName()
                        String varsion_ = version;
                        String methodName = method.getName();
                        Class<?>[] parameterTypes = method.getParameterTypes();
                        Object[] parameters = args;

                        // filter for generic
                        if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {

                            Class<?>[] paramTypes = null;
                            if (args[3]!=null) {
                                String[] paramTypes_str = (String[]) args[3];
                                if (paramTypes_str.length > 0) {
                                    paramTypes = new Class[paramTypes_str.length];
                                    for (int i = 0; i < paramTypes_str.length; i++) {
                                        paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);
                                    }
                                }
                            }

                            className = (String) args[0];
                            varsion_ = (String) args[1];
                            methodName = (String) args[2];
                            parameterTypes = paramTypes;
                            parameters = (Object[]) args[4];
                        }

                        // filter method like "Object.toString()"
                        if (className.equals(Object.class.getName())) {
                            logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);
                            throw new XxlRpcException("xxl-rpc proxy class-method not support");
                        }

                        // address
                        String finalAddress = address;
                        if (finalAddress==null || finalAddress.trim().length()==0) {
                            if (invokerFactory!=null && invokerFactory.getServiceRegistry()!=null) {
                                // discovery
                                String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
                                TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey);
                                // load balance
                                if (addressSet==null || addressSet.size()==0) {
                                    // pass
                                } else if (addressSet.size()==1) {
                                    finalAddress = addressSet.first();
                                } else {
                                    finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
                                }

                            }
                        }
                        if (finalAddress==null || finalAddress.trim().length()==0) {
                            throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
                        }

                        // request
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                        xxlRpcRequest.setAccessToken(accessToken);
                        xxlRpcRequest.setClassName(className);
                        xxlRpcRequest.setMethodName(methodName);
                        xxlRpcRequest.setParameterTypes(parameterTypes);
                        xxlRpcRequest.setParameters(parameters);
                        
                        // send
                        if (CallType.SYNC == callType) {
                            // future-response set
                            XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
                            try {
                                // do invoke
                                client.asyncSend(finalAddress, xxlRpcRequest);

                                // future get
                                XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
                                if (xxlRpcResponse.getErrorMsg() != null) {
                                    throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                }
                                return xxlRpcResponse.getResult();
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            } finally{
                                // future-response remove
                                futureResponse.removeInvokerFuture();
                            }
                        } else if (CallType.FUTURE == callType) {
                            // future-response set
                            XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
                            try {
                                // invoke future set
                                XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
                                XxlRpcInvokeFuture.setFuture(invokeFuture);

                                // do invoke
                                client.asyncSend(finalAddress, xxlRpcRequest);

                                return null;
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

                                // future-response remove
                                futureResponse.removeInvokerFuture();

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            }

                        } else if (CallType.CALLBACK == callType) {

                            // get callback
                            XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
                            XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
                            if (threadInvokeCallback != null) {
                                finalInvokeCallback = threadInvokeCallback;
                            }
                            if (finalInvokeCallback == null) {
                                throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
                            }

                            // future-response set
                            XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
                            try {
                                client.asyncSend(finalAddress, xxlRpcRequest);
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

                                // future-response remove
                                futureResponse.removeInvokerFuture();

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            }

                            return null;
                        } else if (CallType.ONEWAY == callType) {
                            client.asyncSend(finalAddress, xxlRpcRequest);
                            return null;
                        } else {
                            throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");
                        }

                    }
                });
    }

回到前面的代码
getObject的代理之后,为field设置了代理,并把serviceKey(接口名+版本号)放在里一个set里面。

现先总结一下XxlRpcInvokerConfig干的事情吧

  1. 初始化一个XxlRpcSpringInvokerFactory 的bean
  2. 配置注册中心,通信框架,序列化框架
  3. 向注册中心注册
  4. @XxlRpcReference注解的field配置代理

是不是似曾相识,没错,和provider的config其实几乎一样。
那我们从consumer角度看看,一次调用是怎么完成的吧!看完了,前面的疑问应该都能解决了,吧
看看调用代码,用@XxlRpcReference注解了provider提供的接口
然后直接通过接口调用方法。
ok,那么所有猫腻都在@XxlRpcReference这个注解里面了

@Controller
public class IndexController {
    
    @XxlRpcReference
    private DemoService demoService;


    @RequestMapping("")
    @ResponseBody
    public UserDTO http(String name) {

        try {
            return demoService.sayHi(name);
        } catch (Exception e) {
            e.printStackTrace();
            return new UserDTO(null, e.getMessage());
        }
    }

}

先看看这个注解本身吧
给了几个默认值:默认使用netty,使用HESSIAN,使用同步调用,负责均衡方式是轮询

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlRpcReference {

    NetEnum netType() default NetEnum.NETTY;
    Serializer.SerializeEnum serializer() default Serializer.SerializeEnum.HESSIAN;
    CallType callType() default CallType.SYNC;
    LoadBalance loadBalance() default LoadBalance.ROUND;

    //Class<?> iface;
    String version() default "";

    long timeout() default 1000;

    String address() default "";
    String accessToken() default "";

    //XxlRpcInvokeCallback invokeCallback() ;

}

搜一搜那里对这个注解做了处理吧
似曾相识,就是前面提到过的XxlRpcSpringInvokerFactorypostProcessAfterInstantiation方法!
所以整个调用过程应该就是调用代理方法的过程。
这样整个客户端调用过程就比较清晰了

  • 初始化的时候,配置客户端的通信框架,序列化框架,注册中心
  • 通过扫描@XxlRpcReference注解,初始化provider提供的接口的代理
  • 进行远程调用的时候,实际是代理调用
  • 通过代理通信协议客户端和注册中心,向provider请求数据

发一次请求,下个断点看看
果然进到这个代理调用里面了


看看服务端是怎么接收请求的吧,有了前面的服务端代码阅读和客户端调用代码阅读,其实服务端的就比较简单了
长话短说:
在装载XxlRpcSpringProviderFactory的时候会扫描@XxlRpcService注解的类,并把它作为service放进(put)服务map里面

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
        if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                // valid
                if (serviceBean.getClass().getInterfaces().length ==0) {
                    throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                }
                // add service
                XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);

                String iface = serviceBean.getClass().getInterfaces()[0].getName();
                String version = xxlRpcService.version();

                super.addService(iface, version, serviceBean);
            }
        }

        // TODO,addServices by api + prop

    }

当发生远程调用的时候,会调用invokeService方法,主要的代码就是这段通过反射来获取真正需要调用的方法

        try {
            // invoke
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            Object result = method.invoke(serviceBean, parameters);

            /*FastClass serviceFastClass = FastClass.create(serviceClass);
            FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
            Object result = serviceFastMethod.invoke(serviceBean, parameters);*/

            xxlRpcResponse.setResult(result);
        } catch (Throwable t) {
            // catch error
            logger.error("xxl-rpc provider invokeService error.", t);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
        }

ok,到这里整个同步调用的流程就比较清楚了。我们在回顾一下它吹嘘的功能。

  • 1、快速接入:接入步骤非常简洁,两分钟即可上手;

确实还行

  • 2、服务透明:系统完整的封装了底层通信细节,开发时调用远程服务就像调用本地服务,在提供远程调用能力时不损失本地调用的语义简洁性;

通过扫描注解和反射的方式,做到了本地调用的效果

  • 3、多调用方案:支持 SYNC、ONEWAY、FUTURE、CALLBACK 等方案;

现在我们只试过同步调用(SYNC),接下来看看其他调用方式吧

ONEWAY

用ONEWAY模式拿到的返回值是null。
provider的代码如下,其实就是发起了一个不需要结果的调用

else if (CallType.ONEWAY == callType) {
    client.asyncSend(finalAddress, xxlRpcRequest);
    return null;
}

FUTURE

直接返回null,拿到结果要从future里面get

else if (CallType.FUTURE == callType) {
    // future-response set
    XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
    try {
        // invoke future set
        XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
        XxlRpcInvokeFuture.setFuture(invokeFuture);
        // do invoke
        client.asyncSend(finalAddress, xxlRpcRequest);
        return null;
    } catch (Exception e) {
        logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
        // future-response remove
        futureResponse.removeInvokerFuture();
        throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
    }
}
    @RequestMapping("")
    @ResponseBody
    public UserDTO http(String name) {

        try {
            // dto is null
            UserDTO dto =  demoService.sayHi(name);
            Future<UserDTO> future = XxlRpcInvokeFuture.getFuture(UserDTO.class);
            return future.get();
        } catch (Exception e) {
            e.printStackTrace();
            return new UserDTO(null, e.getMessage());
        }
    }

callback

调用完成后会调用onSuccessonFailure方法

else if (CallType.CALLBACK == callType) {
    // get callback
    XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
    XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
    if (threadInvokeCallback != null) {
        finalInvokeCallback = threadInvokeCallback;
    }
    if (finalInvokeCallback == null) {
        throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType=" + CallType.CALLBACK.name() + ") cannot be null.");
    }
    // future-response set
    XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
    try {
        client.asyncSend(finalAddress, xxlRpcRequest);
    } catch (Exception e) {
        logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
        // future-response remove
        futureResponse.removeInvokerFuture();
        throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
    }
    return null;
}
XxlRpcInvokeCallback.setCallback(new XxlRpcInvokeCallback<UserDTO>() {
    @Override
    public void onSuccess(UserDTO result) {
        System.out.println(result);
    }
    @Override
    public void onFailure(Throwable exception) {
        exception.printStackTrace();
    }
});
demoService.sayHi("[CALLBACK]jack");

ps:这部分有很多值得深度阅读的地方,暂时todo

  • 4、多通讯方案:支持 TCP 和 HTTP 两种通讯方式进行服务调用;其中 TCP 提供可选方案 NETTY 或
    MINA ,HTTP 提供可选方案 NETTY_HTTP 或 Jetty;

通过继承同一接口来完成调用方的细节隐藏

  • 5、多序列化方案:支持 HESSIAN、HESSIAN1、PROTOSTUFF、KRYO、JACKSON 等方案;

和通信的方式差不多

  • 6、负载均衡/软负载:提供丰富的负载均衡策略,包括:轮询、随机、LRU、LFU、一致性HASH等;

这部分比较简单,不在赘述

  • 7、注册中心:可选组件,支持服务注册并动态发现;可选择不启用,直接指定服务提供方机器地址通讯;选择启用时,内置可选方案:“XXL-REGISTRY 轻量级注册中心”(推荐)、“ZK注册中心”、“Local注册中心”等;

前面已经提到

  • 8、服务治理:提供服务治理中心,可在线管理注册的服务信息,如服务锁定、禁用等;

通过注册中心实现

  • 9、服务监控:可在线监控服务调用统计信息以及服务健康状况等(计划中);

pass

  • 10、容错:服务提供方集群注册时,某个服务节点不可用时将会自动摘除,同时消费方将会移除失效节点将流量分发到其余节点,提高系统容错能力。

通过注册中心实现

  • 11、解决1+1问题:传统分布式通讯一般通过nginx或f5做集群服务的流量负载均衡,每次请求在到达目标服务机器之前都需要经过负载均衡机器,即1+1,这将会把流量放大一倍。而XXL-RPC将会从消费方直达
    服务提供方,每次请求直达目标机器,从而可以避免上述问题;

通过注册中心实现

  • 12、高兼容性:得益于优良的兼容性与模块化设计,不限制外部框架;除 spring/springboot 环境之外,理论上支持运行在任何Java代码中,甚至main方法直接启动运行;

demo里面有,比较简单,就是不通过反射,手动创建对象。

  • 13、泛化调用:服务调用方不依赖服务方提供的API;

同上

ok,就剩下一个todo了,那就是各种调用方案的具体实现
先todo了...

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容