背景说明
几天前有个项目中的服务器cpu打满了,收到这个消息后,内心无比的担心(兴奋)。于是去检查服务器检查部署的拓扑文档,发现在这个服务器部署的是一个后台计算程序,主要逻辑就是给系统中的用户更新标签。结合用户的历史浏览记录。通过检查进程发现产生了非常多的进程在做计算服务
问题初步定位
出现这个问题的根本原因在于这个后台任务是通过操作系统的crontab进行调用的,出现了第一次调用没有执行完,第二次就又开始执行了,因此造成了一个崩塌,直到cpu被打满。解决这个问题无外乎就是提升性能,或者优化计算逻辑。俗话说靠堆机器解决问题程序员就失去了他的价值何况实际条件也是机器空间只有这么大,因此只能是从代码层面入手了 。
解决问题的思路,与要解决的问题
1、提升计算能力,之前的单线程计算要改造,改成多线程计算,有效的压榨cpu的算力
2、单线程情况下很多对象因为"线程封闭",或者"栈封闭",大家细细品下spring中单例情况下是不是编码规范中不能在service里面使用共享变量也是这个道理,往往是final的,也就是这个原因,李大爷的《Java并发编程实战》里面介绍了三种封闭模式:线程封闭,栈封闭,ThreadLocal封闭。
眼前面对的一个问题就是,由于标签计算用到了开源的ansj分词器,这个分词器肯定是有词典的这个地球人都知道,既然涉及到词典就会面临IO,既然面临IO就会面临磁盘读写,谈到这那么就要粗事情了,算法的同事"充分利用面向对象思想",将ansj的运算过程封装到了一个对象当中,也就是会出现 obj.setText("文本"), obj.getTag()分开调用,因此当我们共用obj这个对象时候就会出现并发问题,所以我们要解决obj的并发安全性问题。
3、因为是一个后台程序所以程序中没有使用tomcat等容器,相信大家也能感受到如果用crontab 拉起jar程序的话,入口就是main方法,但是我们希望自己构建一个定时任务那么我们就要求当前主线程不能关闭,所以这个时候我的解决办法是使用并发包中的CountDownLatch ,通过主线程在启动时候创建一个"倒计数门栓",让主线程在启动定时线程池后就阻塞,避免被创建出来的线程池还没执行就关掉了。
自己代码中构建一个简单的任务调度功能,也就是不再依靠crontab拉起计算程序,而是自己做一个定时任务,这个定时任务要同时满足状态检查,那么这时候第一个并发中常用的内容就登场了 volatile,我们用这个关键字做一个标志位来让并发任务的线程每次在运算前检查这个标志变量,为了简单我们可以把这个变量声明为类变量,放到方法区里面去,那么线程在做判断时候会因为内存屏障的原因去刷新这个标志位然后进行判断,保障一次运算任务要在前面一次运算任务结束后再拉起。在后台进行计算实行只要算不死就往死里算。。。。
4、如何尽可能复用已有代码,也就是Thread如何整合目前spring,说白了就是如何将spring容器引用给到线程中
基本面临的问题就是上面这四个棘手的问题,逐一破之
第一个问题跟第二个问题我们称之为并发改造
先将思路图挂上面
1、将原来单线程代码改造,抽离公共部分封装到一个类中,实现Runnable使之可以并发执行
public class ExtractKeywordThread implements Runnable{
/**
* 竞争资源获得分词器
*/
private ConcurrentLinkedQueue<LSI> lsis;
/**
* 集合用于储存结果
*/
ConcurrentHashMap<WeixinWebsite,Map<String,Double>> allResult;
/**
* 倒计数门栓引用
*/
private CountDownLatch countDownLatch;
/**
* 用户id
*/
private Integer personId;
/**
* 待计算的集合对象
*/
private List<WeixinWebsite> weixinWebsites;
@Override
public void run() {
LSI lsi = null;
try {
lsi = lsis.poll();
for (WeixinWebsite website : weixinWebsites) {
extractKeywordsFromNews(lsi,website);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lsis.add(lsi);
countDownLatch.countDown();
}
}
public ExtractKeywordThread(ConcurrentLinkedQueue<LSI> lsis, CountDownLatch countDownLatch, Integer personId, List<WeixinWebsite> weixinWebsites) {
this.lsis = lsis;
this.countDownLatch = countDownLatch;
this.personId = personId;
this.weixinWebsites = weixinWebsites;
}
private Map<String, Double> extractKeywordsFromNews(LSI lsi, WeixinWebsite website) {
Map<String, Double> map = new HashMap<String, Double>();
try {
if (StringUtil.isBlank(website.getContent())) {
return Collections.emptyMap();
}
String text = StripHT(website.getContent());
if (text.length() < 200) {
return Collections.emptyMap();
}
if (StringUtil.isNotBlank(text)) {
try {
lsi.setText(text);
lsi.genShortSentences(); // 生成短句
lsi.genLongSentences(); // 生成长句
lsi.genTextDict(1); // 生成文章词典
lsi.genALongSentence(3, 0.1, 1); // 生成长句SVD的A矩阵
lsi.genMainSingularValue(0.5F); // 生成主要奇异值
lsi.genKeyWords(50);
List<WeightWord> keywords = lsi.getSeqKeyWords(10);
for (WeightWord word : keywords) {
map.put(word.getWord(), word.getWeight());
}
} catch (Exception e) {
}
}
} catch (Exception e) {
e.printStackTrace();
}
return map;
}
public static String StripHT(String strHtml) {
String txtcontent = strHtml.replaceAll("</?[^>]+>", ""); //剔出<html>的标签
txtcontent = txtcontent.replaceAll("<a>\\s*|\t|\r|\n</a>", "");//去除字符串中的空格,回车,换行符,制表符
return txtcontent;
}
}
详解一下这个类, private ConcurrentLinkedQueue<LSI> lsis;解决的其实是分词对象并发的问题,因为分词器的线程安全问题,所以采用了空间换时间的解决方案,这个地方第一感觉是使用线程级别的绑定更嗨皮,这样可以避免线程争抢,但是面临一个问题就是ThreadLocal可能存在的内存泄漏问题。
java 并发编程中提到 "只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义,而在线程池的线程中不应该使用ThreadLocal在任务之间传递值"。解释出来就是你如果是用在task里面封装的变量那么就应该在任务结束完了以后remove掉,不然就会出问题。我们这个案例中希望公用的话其实应该可以着手在线程池创建时候给线程添加进去,奈何并无此入口,在ThreadFactory中笔者尝试找相关切入点未果,所以换一种思路。使用一个资源池 ConcurrentLinkedQueue,每次运算任务执行完以后我们归还LSI对象,这样一个分词对象资源池就能在线程池里面进行共享了,我TM真是个人才。
2、第二个问题关于自见一个定时任务线程池,核心逻辑就是使用一个定时线程池,另外有一个标志位防止无效的并发,一开始的想法是用阻塞队列,但是后来想阻塞队列也会有积压,所以直接用标志位是最好的解决方案
@Service
public class ScheduledRunner implements BeanFactoryAware{
/**
* 通过spring生命周期注入beanfactory,避免多次启动容器,因为main方法中已经初始化了容器
*/
private BeanFactory beanFactory;
/**
* 用于定时任务定时执行,避免主线程退出或者使用系统级别的定时任务的重复拉起问题,我们的解决思路变为使用线程池常驻内存,使用
* volatile 关键字进行上一次任务执行情况标志,避免重复拉起计算线程,造成的计算机资源耗尽
*/
private static CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* 单线程就可以,主要是触发任务,控制任务的执行程度,计算单元其实是在 workerPool中执行的。
*/
private static ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);
/**
将变量存储于方法区通过volatile修饰完成状态的可见性
*/
public static volatile boolean run = false;
public void run() {
scheduledPool.scheduleAtFixedRate(new BuudooTagsTask(beanFactory),0, 15, TimeUnit.MINUTES);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
}
我们使用一个倒计数门栓这样就将主线程阻塞住了,但是在阻塞之前我们拉起了另外一个定时线程。BuudooTagsTask。所以可以让定时线程池一直在内存中活跃。我们实现了BeanFactoryAware让主线程再加载spring容器时候帮我们注入spring容器引入给当前对象,并且传递给定时任务线程,使定时线程能够尽可能使用框架避免自己写jdbc了等那些东西。
public class BuudooTagsTask implements Runnable{
private static final Logger LOG = LoggerFactory.getLogger(BuudooTagsTask.class);
private BeanFactory factory;
@Override
public void run() {
//通过spring容器获得执行逻辑
try {
if(!ScheduledRunner.run) {
LOG.info("当前没有并行任务可以进行标签运算");
ScheduledRunner.run = true;
BuudooTagsProc proc = factory.getBean("buudooTagsProc", BuudooTagsProc.class);
proc.calculation();
LOG.info("标签运算完成");
ScheduledRunner.run = false;
}
} catch (Exception e) {
LOG.error("BuudooTagsProc error", e);
}
}
public BuudooTagsTask(BeanFactory beanFactory) {
this.factory = beanFactory;
}
}
这个类进行计算标志的判别,如果已经计算了那么将pass此次任务的拉起,如果是当前无任务,则通过注入进来的spring上下文获取标签计算的对象,执行计算操作。
@Service
public class BuudooTagsProc {
private static Logger LOG = Logger.getLogger(BuudooTagsProc.class);
private static final int PAGE_SIZE = 100; //分页查询默认大小
@Value("#{settings['KEYWORD_EXTRACT_COUNT_PER_NEWS']}")
private int KEYWORD_EXTRACT_COUNT_PER_NEWS; //每篇文章抽取关键词数量
@Value("#{settings['DEFAULT_PERSON_TAGS_COUNT']}")
private int DEFAULT_PERSON_TAGS_COUNT; //最终用户打标签数
@Value("#{settings['RECENT_SIZE']}")
private int RECENT_SIZE; //计算最近的资讯数量
private static LSI lsi;
@Value("#{settings['nlp_dic_dir']}")
private String dicDir;
@Resource
private BuudooDAO buudooDAO;
@PostConstruct
public void init() {
//初始化4个资源进到阻塞队列中用作资源池,服务器是4c,
// lsi = new LSI(dicDir);
for (int i = 0; i < 4; i++) {
source.add(new LSI(dicDir));
}
}
//声明一个阻塞队列,用作资源的重复利用
private ConcurrentLinkedQueue<LSI> source = new ConcurrentLinkedQueue<LSI>();
//初始化一个4个线程的线程池
private ExecutorService workerPool = Executors.newFixedThreadPool(4);
/**
* @author guozc
* 计算用户数据标签
*/
public void calculation() {
try {
//查询最新同步id
int lastId = buudooDAO.getLastId();
int pageNo = 1;
List<Integer> persons = buudooDAO.getPersons(lastId, (pageNo-1)*PAGE_SIZE);
if (persons.isEmpty()) {
return;
}
CountDownLatch countDownLatch = new CountDownLatch(persons.size());
while(!persons.isEmpty()) {
for (Integer personId : persons) {
//计算标签并且保存
//使用线程池进行计算按照以人为单位进行拆分,如果按照微站作为任务粒度可能太小,造成切换与调度的阻塞有些大。
ExtractKeywordThread task = new ExtractKeywordThread(source,countDownLatch,personId,buudooDAO.getUserBrowse(personId,RECENT_SIZE));
workerPool.execute(task);
}
countDownLatch.await();
LOG.info("buudoo page = " + pageNo + " has done");
System.out.println( "buudoo page = " + pageNo + " has done");
pageNo++;
persons = buudooDAO.getPersons(lastId, (pageNo - 1) * PAGE_SIZE);
countDownLatch = new CountDownLatch(persons.size());
}
//取得最新id数保存
int maxId = buudooDAO.getMaxId();
if (maxId > lastId) {
buudooDAO.saveRecord(buudooDAO.getMaxId());
}
} catch (Exception e) {
LOG.error("NewsTagsProc calculation error", e);
}
}
}
然后进行抽取运算,此时会创建一个线程池出来,测试中使用的创建一个4 core的线程池,因为过多的并发线程肯定是会造成浪费的,所以选择固定并发数将是一个不错的选择,另外为了防止主线程过多的丢入任务,造成的任务队列被无限添加,所以在代码中再次使用的CountDownLatch ,目的在于,假设一次加载了100个人的计算任务,那么线程池里面最多是10个在执行的任务,还有90个排队的,不至于造成排队链表过长,过长没有什么意义。
最后一个类是单次运算最小的任务单元
public class ExtractKeywordThread implements Runnable{
/**
* 竞争资源获得分词器
*/
private ConcurrentLinkedQueue<LSI> lsis;
/**
* 集合用于储存结果
*/
ConcurrentHashMap<WeixinWebsite,Map<String,Double>> allResult;
/**
* 倒计数门栓引用
*/
private CountDownLatch countDownLatch;
/**
* 用户id
*/
private Integer personId;
/**
* 待计算的集合对象
*/
private List<WeixinWebsite> weixinWebsites;
@Override
public void run() {
LSI lsi = null;
try {
lsi = lsis.poll();
for (WeixinWebsite website : weixinWebsites) {
extractKeywordsFromNews(lsi,website);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (lsi != null) {
//资源确实被获取了,那么要归还资源
lsis.add(lsi);
}
countDownLatch.countDown();
}
}
public ExtractKeywordThread(ConcurrentLinkedQueue<LSI> lsis, CountDownLatch countDownLatch, Integer personId, List<WeixinWebsite> weixinWebsites) {
this.lsis = lsis;
this.countDownLatch = countDownLatch;
this.personId = personId;
this.weixinWebsites = weixinWebsites;
}
private Map<String, Double> extractKeywordsFromNews(LSI lsi, WeixinWebsite website) {
Map<String, Double> map = new HashMap<String, Double>();
try {
if (StringUtil.isBlank(website.getContent())) {
return Collections.emptyMap();
}
String text = StripHT(website.getContent());
if (text.length() < 200) {
return Collections.emptyMap();
}
if (StringUtil.isNotBlank(text)) {
try {
lsi.setText(text);
lsi.genShortSentences(); // 生成短句
lsi.genLongSentences(); // 生成长句
lsi.genTextDict(1); // 生成文章词典
lsi.genALongSentence(3, 0.1, 1); // 生成长句SVD的A矩阵
lsi.genMainSingularValue(0.5F); // 生成主要奇异值
lsi.genKeyWords(50);
List<WeightWord> keywords = lsi.getSeqKeyWords(10);
for (WeightWord word : keywords) {
map.put(word.getWord(), word.getWeight());
}
} catch (Exception e) {
}
}
} catch (Exception e) {
e.printStackTrace();
}
return map;
}
public static String StripHT(String strHtml) {
String txtcontent = strHtml.replaceAll("</?[^>]+>", ""); //剔出<html>的标签
txtcontent = txtcontent.replaceAll("<a>\\s*|\t|\r|\n</a>", "");//去除字符串中的空格,回车,换行符,制表符
return txtcontent;
}
}
至此一个单线程的定时任务就被我们改造成多线程并发的任务了。
待优化以及完善的地方
1、线程池与资源池的灵活配置
笔者因为进行了一些简单测试找到了线程池的大小设置,这里里面有一些小技巧就是如果是IO密集型的那么线程池就设置的大一些,如果是计算密集型的就将线程池数量设置小一些,因为io密集型会有io加载过程,那么cpu会有空闲时间,所以建议调大线程池并发数。而计算密集型则应该尽可能减少线程切换。这位置可以根据实际情况调整为最适合当前机器的参数。
2、如果并发还解决不了怎么办?
那就将标签抽取从运算变成查询,目前项目中每个人看到的每个内容都会进行运算,本身会存在大量重复的运算,那么我们可以考虑先将每个内容先标签化好,然后每个人只是去查询自己阅读历史的标签,这样进行运算,速度会得到本质的提升。应该性能可以提升十几倍甚至几十倍,笔者观察了一下,抽取的耗时由于内容长度不同大概在几十到几百ms不等如果是redis查询可能最多是几ms
3、如果预处理还不行该怎么办?
这说明单台机器已经无法完成运算了,此时我们可以考虑使用引入消息队列,通过一个线程以生产者向消息队列中派发任务,然后横向扩展N台计算机作为计算节点,通过消息队列进行并发控制,另外可以适当进行任务压缩,让一条信息包含十几个甚至几十个任务,这样降低多节点在消息队列的竞争,提升并发性。
下一篇预告:在改造中遇到的坑,笔者如何使用阿里开源的Arthas进行性能追踪的是如何确定我们任务的粒度的这里面涉及到观察线程池线程状态确定我们任务粒度是否合理,以及一些简单的JVM参数设置,观察GC等基础命令的使用