多线程在微服务API统计和健康检查中的使用

API统计

在服务调用的时候,统计每个接口的调用次数,从而做到对接口的限流或统计。

在下面的代码中,使用了多线程的方式进行统计,主要使用了如下概念

  • 线程池 Executor
  • ConcurrentHashMap
  • CountDownLatch

其中列举了四种实现方式

  • 1 使用ConcurrentHashMap统计:不过该方法存在问题,统计的increase不是线程安全的,所以得到的结果不对
  • 2 使用CAS理念对ConcurrentHashMap进行改进,从而解决自增方法increase的问题
  • 3 使用Google的AtomicLongMap,原理同CAS一致,代码量小,比较优雅
  • 4 对HashMap加锁ReentrantReadWriteLock

本文代码示例:countdownlatch-demo

使用ConcurrentHashMap统计

package concurrent;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Java 并发实践- ConcurrentHashMap 与 CAS
 * API调用次数统计
 * 涉及概念: 多线程/线程池/ConcurrentHashMap/CountDownLatch
 * @author billjiang 
 * @createTime 2017-08-04
 */
public class CounterDemo {
    private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();



    /**
     * 接口调用次数,此方法存在问题,ConcurrentHashMap的原子方法是同步的,但increase方法没有同步
     * @param url
     * @return
     */
    public long increase(String url) {
        Long oldValue=urlCounter.get(url);
        Long newValue=(oldValue==null)?1l:oldValue+1;
        urlCounter.put(url,newValue);
        return newValue;
    }

    //获取调用次数
    public long getCount(String url){
        return urlCounter.get(url);
    }

    public static void main(String[] args) {
        ExecutorService executorService= Executors.newFixedThreadPool(10);
        final CounterDemo counterDemo=new CounterDemo();
        int callTime=100000;
        final String url="http://localhost:8082/test";
        CountDownLatch countDownLatch=new CountDownLatch(callTime);

        //模拟并发情况下的接口调用统计
        for (int i = 0; i < callTime; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    counterDemo.increase2(url);
                    countDownLatch.countDown();
                }
            });
        }

        try{
            countDownLatch.await();
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        executorService.shutdown();

        //等待所有线程统计完成后输出调用次数
        System.out.println("调用次数:"+counterDemo.getCount(url));

    }
}

ConcurrentHashMap

从结果上看,使用ConcurrentHashMap存在问题,没有输出预期结果,这是因为ConcurrentHashMap虽然是线程安全的,不过它的线程安全指的是getput等原子方法。而方法increase却不是线程安全的,当然可以通过对increase方法加锁(使用synchonized关键字),不过synchonized是悲观锁,其他线程要挂起等待,影响性能。可以使用类似乐观锁CAS对increase改进。

使用CAS对increase方法改进

关于CAS,可参考这篇文章:

深入浅出Java并发包—CAS机制

改进后的increase方法如下:

  /**
     * CAS 乐观锁/自旋
     * @param url
     * @return
     */
    public long increase2(String url){
        Long oldValue,newValue;
        while(true){
            oldValue=urlCounter.get(url);
            if(oldValue==null){
                newValue=1l;
                //初始化成功,退出循环
                if(urlCounter.putIfAbsent(url,1l)==null)
                    break;
                //如果初始化失败,说明其他线程已经初始化了
            }else{
                newValue=oldValue+1;
                //+1成功,退出循环
                if(urlCounter.replace(url,oldValue,newValue)){
                    break;
                    //如果+1失败,则说明其他线程已经修改过了旧值
                }
            }
        }
        return newValue;
    }

不过还有更简单的方法,就是使用AtomicLongMap

使用Google的AtomicLongMap

AtomicLongMap<String> urlCounter3 = AtomicLongMap.create(); //线程安全,支持并发
public long increase3(String url){
     return urlCounter3.incrementAndGet(url);
}

传统做法,对HashMap加锁

 Map<String, Integer> map = new HashMap<String, Integer>(); //线程不安全
 ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //为map2增加并发锁

 public long increase4(String url){
    //对map2添加写锁,可以解决线程并发问题
        lock.writeLock().lock();
    try{
        if(map.containsKey(key)){
            map.put(key, map.get(key)+1);
        }else{
            map.put(key, 1);
        }
    }catch(Exception ex){
        ex.printStackTrace();
    }finally{
        lock.writeLock().unlock();
    }
 }

上文中提到的CountDownLatch的概念可参考:

CountDownLatch

什么时候使用CountDownLatch

健康检查

场景:服务注册中心需要定时对服务提供者进行心跳检测,即定时调用服务提供者的特定借口,如果返回正常状态吗,则认为服务正常,否则,认为服务提供者异常,在注册中心显示为Down状态,如Consul的服务健康检查机制与之类似。

下面使用CountDownLatch和线程池模拟这种实现。

思路

首先定义一个应用程序启动类,它开始时启动了n个线程类,这些线程将检查外部系统并通知闭锁,并且启动类一直在闭锁上等待着。一旦验证和检查了所有外部服务,那么启动类恢复执行。

实现

BaseHealthChecker:基础健康检查类,实现Runable接口,包含CountDownLatch, ServiceName(服务名称),ServiceUp(服务状态),其中verifyService 为具体继承该类的子类要实现的方法。

package concurrent.health;

import java.util.concurrent.CountDownLatch;

public abstract class BaseHealthChecker implements Runnable {

    private CountDownLatch countDownLatch;

    private String serviceName;

    private boolean serviceUp;

    public BaseHealthChecker(String serviceName,CountDownLatch countDownLatch){
        super();
        this.serviceName=serviceName;
        this.countDownLatch=countDownLatch;
        this.serviceUp=false;
    }

    @Override
    public void run() {
        try{
            verifySerivce();
            serviceUp=true;
        }catch (Throwable t){
            t.printStackTrace(System.err);
            serviceUp=false;
        }finally {
            if(countDownLatch!=null)
                countDownLatch.countDown();
        }

    }


    public String getServiceName() {
        return serviceName;
    }

    public boolean isServiceUp() {
        return serviceUp;
    }

    //this method need to be implemented by all specific service checker
    public abstract void verifySerivce();

}

DatabaseHealthChecker: 数据库健康检查类

package concurrent.health;

import java.util.concurrent.CountDownLatch;

public class DataBaseHealthChecker extends BaseHealthChecker {

    public DataBaseHealthChecker(CountDownLatch countDownLatch) {
        super("database service", countDownLatch);
    }

    @Override
    public void verifySerivce() {
        System.out.println("Checking " + this.getServiceName());
        try {
            Thread.sleep(7000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

FileHealthChecker:文件服务健康检查(UserHealthChecker类似)

package concurrent.health;

import java.util.concurrent.CountDownLatch;

public class FileHealthChecker extends BaseHealthChecker {

    public FileHealthChecker(CountDownLatch countDownLatch) {
        super("file service", countDownLatch);
    }

    @Override
    public void verifySerivce() {
        System.out.println("Checking " + this.getServiceName());
        try {
            Thread.sleep(7000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getServiceName() + " is UP");
    }
}

ApplicationStartupUtil:服务注册中心调用发起方的主类,在系统启动的时候发起健康检测请求。

package concurrent.health;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ApplicationStartupUtil {
    //list of service checker
    private static List<BaseHealthChecker> checkers;

    //this latch will be used to wait on
    private static CountDownLatch countDownLatch;

    //singleton
    private ApplicationStartupUtil() {

    }

    private static ApplicationStartupUtil applicationStartupUtil = new ApplicationStartupUtil();

    public static ApplicationStartupUtil getInstance() {
        return applicationStartupUtil;
    }

    public static boolean checkExternalServices() throws InterruptedException {
        //init the latch with the number of service checks
        countDownLatch = new CountDownLatch(3);

        //add all service checks into the list
        checkers = new ArrayList<>();
        checkers.add(new DataBaseHealthChecker(countDownLatch));
        checkers.add(new UserHealthChecker(countDownLatch));
        checkers.add(new FileHealthChecker(countDownLatch));

        //start service checks using executor framework
        ExecutorService executor = Executors.newFixedThreadPool(checkers.size());
        for (BaseHealthChecker checker : checkers) {
            executor.execute(checker);
        }

        //now wait all services checked
        countDownLatch.await();

        //service checkers are finished and now proceed startup
        for (BaseHealthChecker checker : checkers) {
            if (!checker.isServiceUp()) {
                return false;
            }
        }
        return true;


    }
}

测试

测试方法

package concurrent.health;

public class TestMain {
    public static void main(String[] args) {
        boolean result = false;
        try {
            result = ApplicationStartupUtil.checkExternalServices();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        System.out.println("External services validation completed !! Result was :: " + result);
    }

}

结果

Checking database service
Checking file service
Checking user service
database service is UP
user service is UP
file service is UP
External services validation completed !! Result was :: true

本文参考了什么时候使用CountDownLatch

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,169评论 11 349
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,787评论 1 19
  • 相关概念 面向对象的三个特征 封装,继承,多态.这个应该是人人皆知.有时候也会加上抽象. 多态的好处 允许不同类对...
    东经315度阅读 1,917评论 0 8
  • 慢慢等,会来的,总会来的。 雷雨,狂风。萧萧,飒爽,吞噬。 站台,静等。匆匆,人群,褪淡。 午夜临,一丝亮光,车来...
    张某某T阅读 360评论 2 1
  • 认识你是一次偶然,那天我在自助咖啡机前面排队,到我的时候,我翻遍全身也找不到零钱,明明放在上衣口袋里的。后面的同学...
    吴大仁阅读 349评论 0 1