线程各个状态

线程各个状态.png
java并发中常见的锁
- 偏向锁,轻量级锁,重量级锁
- 偏向锁: 其核心的思想是,如果程序没有竞争,则取消之前已经取得锁的线程同步操作。也就是锁消除
- 轻量级锁: 其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能,自旋等待
- 重量级锁:synchronized这类的,追求吞吐量。同步块执行速度较长。
- 乐观锁,悲观锁
- 乐观锁,认为多线程竞争不激烈,竞争不激烈的情况下, 在数据库表中增加版本号,先查数据,然后根据之前的版本号更新这条数据,找得到更新,找不到不更,这就是乐观锁的实现。
- 悲观锁: 认为多线程竞争激烈。
- 公平锁,非公平锁
- sychronized为非公平锁,锁获取随机, 公平锁的锁获取是根据申请时间的,非公平锁处理的快。
- 可重入锁
- 可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。
- 独享锁/共享锁
- 独享锁是指该锁一次只能被一个线程所持有;共享锁是指该锁可被多个线程所持有。
- 对于Java ReentrantLock而言,其是独享锁。但是对于Lock的另一个实现类ReadWriteLock,其读锁是共享锁,其写锁是独享锁。
- 分段锁
- 分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操
CountDownLatch的使用,以及多线程读取文件
- 多线程读取文件是一个文件被打开多次的场景。
- CountDownLatch用于主线程等待其他线程执行完之后再执行,看代码
public class TestMultiRead {
private static final int DOWN_THREAD_NUM = 10;//起10个线程去读取指定文件
private static final String OUT_FILE_NAME = "d:\\倚天屠龙记.txt";
private static final String keywords = "的";
public static void main(String[] args) {
//jdk1.5线程辅助类,让主线程等待所有子线程执行完毕后使用的类,
//另外一个解决方案:自己写定时器,个人建议用这个类
CountDownLatch doneSignal = new CountDownLatch(DOWN_THREAD_NUM);
RandomAccessFile[] outArr = new RandomAccessFile[DOWN_THREAD_NUM];
try {
long length = new File(OUT_FILE_NAME).length();
System.out.println("文件总长度:" + length + "字节");
//每线程应该读取的字节数
long numPerThred = length / DOWN_THREAD_NUM;
System.out.println("每个线程读取的字节数:" + numPerThred + "字节");
//整个文件整除后剩下的余数
long left = length % DOWN_THREAD_NUM;
for (int i = 0; i < DOWN_THREAD_NUM; i++) {
//为每个线程打开一个输入流、一个RandomAccessFile对象,
//让每个线程分别负责读取文件的不同部分
outArr[i] = new RandomAccessFile(OUT_FILE_NAME, "rw");
if (i != 0) {
//
// isArr[i] = new FileInputStream("d:/勇敢的心.rmvb");
//以指定输出文件创建多个RandomAccessFile对象
}
if (i == DOWN_THREAD_NUM - 1) {
// //最后一个线程读取指定numPerThred+left个字节
// System.out.println("第"+i+"个线程读取从"+i * numPerThred+"到"+((i + 1) * numPerThred+ left)+"的位置");
new ReadThread(i * numPerThred, (i + 1) * numPerThred
+ left, outArr[i], keywords, doneSignal).start();
} else {
//每个线程负责读取一定的numPerThred个字节
// System.out.println("第"+i+"个线程读取从"+i * numPerThred+"到"+((i + 1) * numPerThred)+"的位置");
new ReadThread(i * numPerThred, (i + 1) * numPerThred,
outArr[i], keywords, doneSignal).start();
}
}
} catch (Exception e) {
e.printStackTrace();
}
// finally{
//
// }
//确认所有线程任务完成,开始执行主线程的操作
try {
doneSignal.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//这里需要做个判断,所有做read工作线程全部执行完。
KeyWordsCount k = KeyWordsCount.getCountObject();
// Map<String,Integer> resultMap = k.getMap();
System.out.println("指定关键字出现的次数:" + k.getCount());
}
}
public class ReadThread extends Thread {
//定义字节数组(取水的竹筒)的长度
private final int BUFF_LEN = 256;
//定义读取的起始点
private long start;
//定义读取的结束点
private long end;
//将读取到的字节输出到raf中 randomAccessFile可以理解为文件流,即文件中提取指定的一部分的包装对象
private RandomAccessFile raf;
//线程中需要指定的关键字
private String keywords;
//此线程读到关键字的次数
private int curCount = 0;
/**
* jdk1.5开始加入的类,是个多线程辅助类
* 用于多线程开始前统一执行操作或者多线程执行完成后调用主线程执行相应操作的类
*/
private CountDownLatch doneSignal;
public ReadThread(long start, long end, RandomAccessFile raf,String keywords,CountDownLatch doneSignal){
this.start = start;
this.end = end;
this.raf = raf;
this.keywords = keywords;
this.doneSignal = doneSignal;
}
public void run(){
try {
raf.seek(start);
System.out.println("每次开始的 " + start);
//本线程负责读取文件的大小
long contentLen = end - start;
//定义最多需要读取几次就可以完成本线程的读取
long times = contentLen / BUFF_LEN+1;
System.out.println(this.toString() + " 需要读的次数:"+times);
byte[] buff = new byte[BUFF_LEN];
int hasRead = 0;
String result = null;
for (int i = 0; i < times; i++) {
//之前SEEK指定了起始位置,这里读入指定字节组长度的内容,read方法返回的是下一个开始读的position
hasRead = raf.read(buff);
//如果读取的字节数小于0,则退出循环! (到了字节数组的末尾)
if (hasRead < 0) {
break;
}
result = new String(buff,"gb2312");
/// System.out.println(result);
int count = this.getCountByKeywords(result, keywords);
if(count > 0){
this.curCount += count;
}
}
KeyWordsCount kc = KeyWordsCount.getCountObject();
kc.addCount(this.curCount);
doneSignal.countDown();//current thread finished! noted by latch object!
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public long getEnd() {
return end;
}
public void setEnd(long end) {
this.end = end;
}
public RandomAccessFile getRaf() {
return raf;
}
public void setRaf(RandomAccessFile raf) {
this.raf = raf;
}
public int getCountByKeywords(String statement,String key){
return statement.split(key).length-1;
}
public int getCurCount() {
return curCount;
}
public void setCurCount(int curCount) {
this.curCount = curCount;
}
public CountDownLatch getDoneSignal() {
return doneSignal;
}
public void setDoneSignal(CountDownLatch doneSignal) {
this.doneSignal = doneSignal;
}
}
public class KeyWordsCount {
private static KeyWordsCount kc;
private int count = 0;
private KeyWordsCount(){
}
public static synchronized KeyWordsCount getCountObject(){
if(kc == null){
kc = new KeyWordsCount();
}
return kc;
}
public synchronized void addCount(int count){
System.out.println("增加次数:"+count);
this.count += count;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
}
CountDownLatch和CyclicBarrier的区别
- CountDownLatch基于AQS,是递减的方式等待所有线程执行完,不可重复使用。 CyclicBarrier是递增的形式,可重复使用,CyclicBarrier基于ReentrantLock
BufferedInputStream多线程安全性
- 当BufferedInputStream new出来之后,用for执行线程池线程模拟多线程场景。一个文件只被打开一次,区别于一个文件被多次打开的场景。
- BufferedInputStream的public方法除了close方法外,其他都加上synchronized以保证线程安全。close方法不加synchronized是因为synchronized是锁在对象上,如果read方法卡住了,close就没办法获取到锁,就执行不了了,这不符合close规则。
- 看下close方法, buf是volatile 就是为了保证可见性,bufUpdater 使用cas就是为了配合close方法。bufUpdater有两个地方使用,一个close方法一个fill方法, close会cas不断尝试,fill一旦更新失败就抛异常
protected volatile byte buf[];
AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
AtomicReferenceFieldUpdater.newUpdater
(BufferedInputStream.class, byte[].class, "buf");
public void close() throws IOException {
byte[] buffer;
while ( (buffer = buf) != null) {
if (bufUpdater.compareAndSet(this, buffer, null)) {
InputStream input = in;
in = null;
if (input != null)
input.close();
return;
}
// Else retry in case a new buf was CASed in fill()
}
}
private void fill() throws IOException {
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
throw new IOException("Stream closed");
}
}
- AtomicReferenceFieldUpdater能对类中某一个volatile属性进行cas,看代码
/**
* 一个基于反射的工具类,它能对指定类的指定的volatile引用字段进行原子更新
*/
public class TestAtomicReferenceFieldUpdater {
public static void main(String[] args) {
AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Person.class, String.class, "name");
Person person = new Person();
updater.compareAndSet(person, person.name, "Kylin");
System.out.println("------->" + person.name);
}
}
public class Person {
volatile String name = "lx";
}