基于ignite的分布式框架

项目背景

最近在做公司项目的微服务改造,在dubbo和spring-cloud这两个主流的微服务框架之前技术选型徘徊了好久,两个框架在性能,设计,社区支持都非常地完美。然而我为什么要选择自己去封装ignite去实现RPC服务呢?

  • 原因一:
    公司缺乏专门的运维人员去维护第三方的中间件服务,例如 Redis,Zookeeper,Rabbitmq等等的这些中间件。
  • 原因二:
    应用本身要求的高并发不多,但是需要做高可用,引入dubbo和spring-cloud视乎有点小题大做的感觉
  • 原因三:
    使用dubbo和spring-cloud对现有项目的代码改造工作量有点大,本身公司项目有部分已经基于ignite做服务网格了
  • 原因四:
    工作这么久了,还是要尝试封装一下框架,所以这个框架是我的处女做,希望大家能够批评和指正

项目概述

底层基于apache ignite,特点是可以做到不依赖外部中间件,实现RPC服务分布式缓存分布式计算分布式消息等功能特性
框架也基于ignite的集群管理,实现了基于集群组的颗粒度的服务调用,即针对集群组的调用

  • JDK和Spring boot版本
    JDK版本为1.8
    Spring boot 版本要求1.5.3以上

框架说明

服务中启动的 Spring boot 应用同时启动了ignite的server和client模式,注入到了Spring容器中

    @Autowired
    @Qualifier("igniteClient")
    private Ignite igniteClient;

    @Autowired
    @Qualifier("igniteServer")
    private Ignite igniteServer;

因此你可以无缝地使用框架没有封装的ignite功能,更多的ignite的功能,请参考中文官网
https://www.ignite-service.cn/doc/java/

功能概述

  • RPC服务
  • 分布式消息
  • 分布式广播
  • 分布式计算

quick-start

构建

git clone https://github.com/konglinghai123/ignite-spring-project.git
cd ignite-spring-boot-starter
mvn clean install

构建一个基于ignite的 spring boot 项目

  • 添加依赖:
   <dependency>
        <groupId>com.github.kong.spring.boot</groupId>
        <artifactId>ignite-spring-boot-starter</artifactId>
        <version>1.0</version>
   </dependency>
  • 在application-yml添加ignite的相关配置信息,样例配置如下:

  • zookeeper 发现

#zookeeper发现
ignite-cluster:
  name: hello_client_1 #节点名称
  role: client
  des: 测试服务
  zookeeperUrl: 192.168.56.100:2181
  localAddress: 127.0.0.1
  localPort: 47600
  • 动态ip发现
ignite-cluster:
  name: hello_server #节点名称
  role: server
  des: 测试服务端
  multicast-group: 224.0.1.111 #组播地址
  localAddress: 127.0.0.1
  localPort: 48600
  • 为了开发方便,如果Spring boot Appliction 类的不在包名com.github.kong目录下,接下来在Spring Boot Application的上添加@ComponentScan("com.github.kong.*"),这样idea可以通过看到一些Bean是否已经注入了,当然也可以不添加,框架也有写扫描注入
@SpringBootApplication
@ComponentScan("com.github.kong.*")
public class HelloWorldServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(HelloWorldServerApplication.class);
    }
}

RPC服务的创建与消费

发布服务基于ignite的RPC服务

  • 编写你的ignite服务,需要添加要发布的服务实现上添加@IgniteRpcService注解,继承IgniteService.
  • HelloWorld 是定义的接口
@Service
@IgniteRpcService(des = "这是一个例子")
public class HelloWorldService extends IgniteService implements HelloWorld {


}
  • @IgniteRpcService 注解的定义
/**
 * 服务提供者注解
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface IgniteRpcService {

    /**
     * @return
     */
    String version() default "1.0";

    //接口描述
    String des() default "";

    //单个节点部署的实例数
    int maxPerNodeCount() default 1;

    //整个集群部署的最大实例数,0:无限制
    int total() default 0;

}
  • 启动你的Spring Boot应用,观察控制台,可以看到ignite启动相关信息.

调用已经发布的RPC服务

  • Spring boot 应用配置同上,唯一不同的是,需要更改配置
#zookeeper发现
ignite-cluster:
  name: hello_client_1 #节点名称 (必须在集群中唯一)
  • 通过@IgniteRpcReference注入需要使用的interface.
@Controller
public class HelloWorldController {

    @IgniteRpcReference
    private HelloWorld helloWorldService;

    @RequestMapping("/helloworld")
    @ResponseBody
    public String test(){
        return helloWorldService.sayHello("kong");
    }

}
  • 调用不同版本的RPC服务
 @IgniteRpcReference(version = "1.1")
    private HelloWorld helloWorldService;
  • @IgniteRpcReference 注解的定义
/**
 * 网格服务注入注解
 */
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface IgniteRpcReference {

    String version() default "1.0";

    //默认使用负载均衡
    boolean isLoadbalance() default true;

    //默认不设超时
    long timeout() default 0;
}

分布式消息

分布式消息是基于内存的消息订阅系统,如果需要持久化请使用外部的消息系统

定义话题消费者

@Service
@IgniteMessageListener(topic = "hello",isBroadcast = false)
public class HelloWorldMessage implements IgniteMessageRecevicer<String> {

    @Override
    public boolean apply(UUID uuid, MessageModel<String> messageModel) {
        System.out.println(messageModel);
        return true;
    }
}
  • @IgniteMessageListener 注解的定义
/**
 * 服务提供者注解
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface IgniteMessageListener {


    //消息主题
    String topic();

    //消息描述
    String des() default "";

    //是否针对集群内的所有节点(是否允许重复消费)
    boolean isBroadcast() default true;

}
  • MessageModel 是一个消息封装,发送消息时必须用它来发送

发送话题消息

@Controller
@RequestMapping("/message")
public class MessageController {

    @Autowired
    private IgniteMessageSender sender;

    @RequestMapping("/sayHello")
    @ResponseBody
    public String test(){
        sender.toRemote("hello", new MessageModel<>("1212"));
        return "1212";
    }
}
  • IgniteMessageSender是框架注入的Bean,可以直接引用

<span id="BroadCast">分布式广播</span>

分布式广播是指:对集群组的所有节点发送消息,然后获取所有节点返回的结果,原来是基于ignite的分布式闭包利用反射机制调用spring容器内Bean的方法

发送一个分布式广播

@Controller
@RequestMapping("/broadcast")
public class BroadcastController {

    @Autowired
    private BroadcastServiceExecutor broadcastServiceExecutor;

    @RequestMapping("/sayHello")
    @ResponseBody
    public List<String> test(){
        return (List<String>) broadcastServiceExecutor.broadcast("server", TestBroadService.class,"sayHello","12123");
    }
}
  • BroadcastServiceExecutor是框架注入的Bean,可以直接引用

  • broadcast 方法提供3个方法定义

   /**
     * 向其他集群广播
     * @param targetRole 集群标识
     * @param targetClass api类
     * @param methodName 方法名称
     * @param args 参数
     * @return
     */
    public List broadcast(String targetRole,Class targetClass,String methodName,Object... args){...}

      /**
         * 向远端集群广播消息
         * @param targetClass
         * @param methodName
         * @param args
         * @return
         */
     public List broadcastRemote(Class targetClass,String methodName,Object... args){...}


        /**
          * 向集群内广播消息
          * @param targetClass
          * @param methodName
          * @param args
          * @return
          */
      public List broadcastLocal(Class targetClass,String methodName,Object... args){...}

分布式计算

分布式计算允许用户执行基于内存的Map-Reduce任务

  • 创建 Map-Reduce 任务 ,需继承ComputeTaskSplitAdapter(import org.apache.ignite.compute.ComputeTaskSplitAdapter),泛型<T,R>
  • T:入参,R:返回类型
//字数统计测试
@Service
public class MapExampleCharacterCountTask  extends ComputeTaskSplitAdapter<List<String>,Integer>  {


    @Nullable
    @Override
    public Integer reduce(List<ComputeJobResult> results) throws IgniteException {
        return results.stream().mapToInt(ComputeJobResult::<Integer>getData).sum();
    }

    @Override
    protected Collection<? extends ComputeJob> split(int gridSize, List<String> arg) throws IgniteException {
        LinkedList jobs = new LinkedList();

        List<List<String>> list = CollectionUtils.split(arg,10000);

        for(final List<String> words : list){
            jobs.add(new ComputeJobAdapter() {
                @Override
                public Object execute() throws IgniteException {
                    int i = 0;
                    for(String s : words){
                        i = i + s.length();
                    }
                    return i;
                }
            });
        }

        return jobs;
    }
}

  • 执行 Map-Reduce 任务
@Controller
@RequestMapping("/mr")
public class MRTestController {

    @Autowired
    private MapReduceTaskExecutor<List<String>,Integer> mapReduceTaskExecutor;

    @RequestMapping("/test")
    @ResponseBody
    public Object test(){
        try {

            List<String> records = new ArrayList<>();
            // 创建CSV读对象
            CsvReader csvReader = new CsvReader(new FileInputStream("D:\\data\\cs2.csv"), Charset.forName("GBK"));

            while (csvReader.readRecord()){
                // 读一整行
                records.add(csvReader.getRawRecord());
            }

            List<String> bigRecords = new ArrayList<>();
            for(int i = 0; i < 5; i++){
                bigRecords.addAll(records);
            }
            return mapReduceTaskExecutor.execute(MapExampleCharacterCountTask.class,bigRecords);
        } catch (IOException e) {
            e.printStackTrace();
        }

        return "";
    }

}
  • MapReduceTaskExecutor是框架注入的Bean,可以直接引用

集群管理api

框架注入了IgniteManager这个bean,可以实现以下功能

public interface IgniteManager {

    /**
     * 获取节点列表
     *
     * @return
     */
    List<NodeInfo> list();

    /**
     * 获取节点的详细信息
     *
     * @param nodeId
     * @return
     */
    ClusterMetrics info(String nodeId);

    /**
     * 获取微服务的基本信息
     *
     * @return
     */
    List<ServiceInfo> servieInfos();

    /**
     * 集群消息信息
     * @return
     */
    List<MessageInfo> messagInfos();

}
  • 使用@Autowired 注入即可
@Controller
@RequestMapping("/admin")
public class AdminCotroller {

    @Autowired
    private IgniteManager igniteManager;

    @RequestMapping("/nodes")
    @ResponseBody
    public List<NodeInfo> nodes(){
        return igniteManager.list();
    }


    @RequestMapping("/nodeInfo/{id}")
    @ResponseBody
    public ClusterMetrics info(@PathVariable("id") String id){
        return igniteManager.info(id);
    }

    @RequestMapping("/services")
    @ResponseBody
    public List<ServiceInfo> services(){
        return igniteManager.servieInfos();
    }

}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容