API统计
在服务调用的时候,统计每个接口的调用次数,从而做到对接口的限流或统计。
在下面的代码中,使用了多线程的方式进行统计,主要使用了如下概念
- 线程池 Executor
- ConcurrentHashMap
- CountDownLatch
其中列举了四种实现方式
- 1 使用ConcurrentHashMap统计:不过该方法存在问题,统计的increase不是线程安全的,所以得到的结果不对
- 2 使用CAS理念对ConcurrentHashMap进行改进,从而解决自增方法increase的问题
- 3 使用Google的AtomicLongMap,原理同CAS一致,代码量小,比较优雅
- 4 对HashMap加锁ReentrantReadWriteLock
本文代码示例:countdownlatch-demo
使用ConcurrentHashMap统计
package concurrent;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Java 并发实践- ConcurrentHashMap 与 CAS
* API调用次数统计
* 涉及概念: 多线程/线程池/ConcurrentHashMap/CountDownLatch
* @author billjiang
* @createTime 2017-08-04
*/
public class CounterDemo {
private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();
/**
* 接口调用次数,此方法存在问题,ConcurrentHashMap的原子方法是同步的,但increase方法没有同步
* @param url
* @return
*/
public long increase(String url) {
Long oldValue=urlCounter.get(url);
Long newValue=(oldValue==null)?1l:oldValue+1;
urlCounter.put(url,newValue);
return newValue;
}
//获取调用次数
public long getCount(String url){
return urlCounter.get(url);
}
public static void main(String[] args) {
ExecutorService executorService= Executors.newFixedThreadPool(10);
final CounterDemo counterDemo=new CounterDemo();
int callTime=100000;
final String url="http://localhost:8082/test";
CountDownLatch countDownLatch=new CountDownLatch(callTime);
//模拟并发情况下的接口调用统计
for (int i = 0; i < callTime; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
counterDemo.increase2(url);
countDownLatch.countDown();
}
});
}
try{
countDownLatch.await();
}catch (InterruptedException e){
e.printStackTrace();
}
executorService.shutdown();
//等待所有线程统计完成后输出调用次数
System.out.println("调用次数:"+counterDemo.getCount(url));
}
}
从结果上看,使用ConcurrentHashMap存在问题,没有输出预期结果,这是因为ConcurrentHashMap虽然是线程安全的,不过它的线程安全指的是get
和put
等原子方法。而方法increase却不是线程安全的,当然可以通过对increase方法加锁(使用synchonized关键字),不过synchonized是悲观锁,其他线程要挂起等待,影响性能。可以使用类似乐观锁CAS对increase改进。
使用CAS对increase方法改进
关于CAS,可参考这篇文章:
改进后的increase方法如下:
/**
* CAS 乐观锁/自旋
* @param url
* @return
*/
public long increase2(String url){
Long oldValue,newValue;
while(true){
oldValue=urlCounter.get(url);
if(oldValue==null){
newValue=1l;
//初始化成功,退出循环
if(urlCounter.putIfAbsent(url,1l)==null)
break;
//如果初始化失败,说明其他线程已经初始化了
}else{
newValue=oldValue+1;
//+1成功,退出循环
if(urlCounter.replace(url,oldValue,newValue)){
break;
//如果+1失败,则说明其他线程已经修改过了旧值
}
}
}
return newValue;
}
不过还有更简单的方法,就是使用AtomicLongMap
使用Google的AtomicLongMap
AtomicLongMap<String> urlCounter3 = AtomicLongMap.create(); //线程安全,支持并发
public long increase3(String url){
return urlCounter3.incrementAndGet(url);
}
传统做法,对HashMap加锁
Map<String, Integer> map = new HashMap<String, Integer>(); //线程不安全
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //为map2增加并发锁
public long increase4(String url){
//对map2添加写锁,可以解决线程并发问题
lock.writeLock().lock();
try{
if(map.containsKey(key)){
map.put(key, map.get(key)+1);
}else{
map.put(key, 1);
}
}catch(Exception ex){
ex.printStackTrace();
}finally{
lock.writeLock().unlock();
}
}
上文中提到的CountDownLatch的概念可参考:
健康检查
场景:服务注册中心需要定时对服务提供者进行心跳检测,即定时调用服务提供者的特定借口,如果返回正常状态吗,则认为服务正常,否则,认为服务提供者异常,在注册中心显示为Down
状态,如Consul的服务健康检查机制与之类似。
下面使用CountDownLatch和线程池模拟这种实现。
思路
首先定义一个应用程序启动类,它开始时启动了n个线程类,这些线程将检查外部系统并通知闭锁,并且启动类一直在闭锁上等待着。一旦验证和检查了所有外部服务,那么启动类恢复执行。
实现
BaseHealthChecker:基础健康检查类,实现Runable接口,包含CountDownLatch, ServiceName(服务名称),ServiceUp(服务状态),其中verifyService 为具体继承该类的子类要实现的方法。
package concurrent.health;
import java.util.concurrent.CountDownLatch;
public abstract class BaseHealthChecker implements Runnable {
private CountDownLatch countDownLatch;
private String serviceName;
private boolean serviceUp;
public BaseHealthChecker(String serviceName,CountDownLatch countDownLatch){
super();
this.serviceName=serviceName;
this.countDownLatch=countDownLatch;
this.serviceUp=false;
}
@Override
public void run() {
try{
verifySerivce();
serviceUp=true;
}catch (Throwable t){
t.printStackTrace(System.err);
serviceUp=false;
}finally {
if(countDownLatch!=null)
countDownLatch.countDown();
}
}
public String getServiceName() {
return serviceName;
}
public boolean isServiceUp() {
return serviceUp;
}
//this method need to be implemented by all specific service checker
public abstract void verifySerivce();
}
DatabaseHealthChecker: 数据库健康检查类
package concurrent.health;
import java.util.concurrent.CountDownLatch;
public class DataBaseHealthChecker extends BaseHealthChecker {
public DataBaseHealthChecker(CountDownLatch countDownLatch) {
super("database service", countDownLatch);
}
@Override
public void verifySerivce() {
System.out.println("Checking " + this.getServiceName());
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
}
FileHealthChecker:文件服务健康检查(UserHealthChecker类似)
package concurrent.health;
import java.util.concurrent.CountDownLatch;
public class FileHealthChecker extends BaseHealthChecker {
public FileHealthChecker(CountDownLatch countDownLatch) {
super("file service", countDownLatch);
}
@Override
public void verifySerivce() {
System.out.println("Checking " + this.getServiceName());
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
}
ApplicationStartupUtil:服务注册中心调用发起方的主类,在系统启动的时候发起健康检测请求。
package concurrent.health;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ApplicationStartupUtil {
//list of service checker
private static List<BaseHealthChecker> checkers;
//this latch will be used to wait on
private static CountDownLatch countDownLatch;
//singleton
private ApplicationStartupUtil() {
}
private static ApplicationStartupUtil applicationStartupUtil = new ApplicationStartupUtil();
public static ApplicationStartupUtil getInstance() {
return applicationStartupUtil;
}
public static boolean checkExternalServices() throws InterruptedException {
//init the latch with the number of service checks
countDownLatch = new CountDownLatch(3);
//add all service checks into the list
checkers = new ArrayList<>();
checkers.add(new DataBaseHealthChecker(countDownLatch));
checkers.add(new UserHealthChecker(countDownLatch));
checkers.add(new FileHealthChecker(countDownLatch));
//start service checks using executor framework
ExecutorService executor = Executors.newFixedThreadPool(checkers.size());
for (BaseHealthChecker checker : checkers) {
executor.execute(checker);
}
//now wait all services checked
countDownLatch.await();
//service checkers are finished and now proceed startup
for (BaseHealthChecker checker : checkers) {
if (!checker.isServiceUp()) {
return false;
}
}
return true;
}
}
测试
测试方法
package concurrent.health;
public class TestMain {
public static void main(String[] args) {
boolean result = false;
try {
result = ApplicationStartupUtil.checkExternalServices();
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("External services validation completed !! Result was :: " + result);
}
}
结果
Checking database service
Checking file service
Checking user service
database service is UP
user service is UP
file service is UP
External services validation completed !! Result was :: true
本文参考了什么时候使用CountDownLatch