问题引出
多线程对共享对象并发访问时,可能有线程安全问题,那么我们如何保护共享对象有且只有一个线程访问,实现对共享变量访问互斥的效果。
在java中,最基本的互斥同步方法就是使用synchronized关键字,synchronized可以用于修饰类的实例方法,静态方法和代码块,每个对象都可以关联一把锁,修饰的实例方法,锁是指当前对象,修饰的的静态方法,锁是指当前类对象。那锁讲究是什么,有什么属性和方法,是如何实现线程互斥的?下面可以结合信号量和管程来说明java锁的实现。
信号量
信号量是操作系统提供的一种互斥共享资源访问的方法,它的实现如下:
```
Class Semaphore {
int sem; //二进制信号量中 资源数目为0或1
WaitQueue q;//获取不到锁,线程放进等待队列
}
Semaphore :: P() { // 原子操作加锁
sem --;
if (sem<0) {
//把当前线程放到等待队列中,线程阻塞状态,放弃cpu执行
block(P);
}
}
Semaphore :: V() {//原子操作解锁
sem ++;
if (sem<=0) {
//从等待队列中移除一个线程,并唤醒线程,使其变成可运行状态
wakeup(t);
}
}
```
信号量的实例化
mutex = new Semaphore(1);
mutex -> P();
// 临界区保护代码
mutex -> V();
[if !supportLists]· [endif]p()方法说明:第一个线程进来之后,由1变成0,能进去;若第二个线程也想进入临界区,由0变成-1,在P操作进入等待状态
[if !supportLists]· [endif]v()方法说明:第一次线程执行结束,进行V操作,使信号量计数由-1变成0, 唤醒第二个进程,则第二个进程就会在第一个进程结束时进入临界区
管程
管程是一种用于多线程互斥访问共享资源的程序结构采用面向对象方法,简化了线程的同步机制。有一个锁和多个条件变量组成。锁控制对共享对象的互斥访问,条件变量用于线程间的同步合作。
管程的定义
class Monitor{
Semaphore lock;//锁
List<Conditioin> condition;//条件变量,一个管程可以有多个条件变量
}
条件变量的定义
class Condition{
int numWating=0;//等待线程的数量
WaitQueue q;//条件等待队列
}
Condition::Wait(lock){
numWating++;
release(lock);//释放锁
schedule();
require(lock);
}
Condition::Signal(){
if(numWating>0){
wakeup(t);// 唤醒等待队列的线程
numWaiting--;
}
}
[if !supportLists]· [endif]一个条件变量包含有一个等待队列。
[if !supportLists]· [endif]wait() 方法会释放锁,把当前线程放到条件等待队列,释放cpu
[if !supportLists]· [endif]signal()方法将等待队列中的一个线程唤醒
[if !supportLists]· [endif]一个管程可以包含2种队列,一种是互斥等待队列,一种是条件等待队列
管程生产者和消费者模型
Class BoundedBuffer { // 共享缓冲变量
Monitor lock;
int count = 0; //当前缓冲大小
int total=100;//总缓冲大小
lock.Condition full,
lock.Condition empty;
}
BoundedBuffer :: Product(c) {
lock -> Acquire();
while (count == total)
full.Wait(&lock);
//生产一个商品加入缓冲
count ++;
empty.Signal();
lock -> Release();
}
BoundedBuffer :: comsume(c) {
lock -> Acquire(c);
while (count == 0)
empty.Wait(&lock);
//消费一个商品
count --;
full.Signal();
lock -> Release();
}
(1) 生产者生产商品时,需要先获取锁,这个锁就是当前管程封装的锁,如果缓冲区满了,当前线程条调用wait()方法,会释放当前锁,且把当前线程放入full条件等待队列,如果缓冲区没满,那么生产商品,同时唤醒empty条件队列中的线程,在释放锁资源。
(2)消费者消费商品同样先要获取锁,判断当前是否有商品消费,如果没有,那么需要在empty条件队列等待,如果有商品,先消费一个商品,在唤醒full条件队列的线程,说明当前消费了商品,需要唤醒full中的线程,通知生产线程继续生产,从而达到线间同步协作。
(3)在调用条件变量的wait方法时都要求要先获取锁,且需要在判断条件时用while判断,当唤醒了线程,线程从wait 方法返回时候,需要在此判断条件,所以一定要用到while循环,这个和java中的Object的wait方法使用要求是一致的。
java synchronized 原理
每个对象都有一把锁,这个锁我们可以理解为管程,java对象是通过对象头中的Mark word中的指针来找到管程对象(ObjectMonitor)在jvm源码hotspot\src\share\vm\runtime可以找到这个管程对象
ObjectMonitor() {
_recursions = 0;
_owner = NULL;
_WaitSet = NULL;
_EntryList = NULL ;
}
[if !supportLists]· [endif]_recursions:记录owner线程获取锁的次数,这也决定了synchronized是可重入的。
[if !supportLists]· [endif]_owner:指向拥有该对象的线程, 通过cas 设置当前线程,设置成功说明当前线程获取锁
[if !supportLists]· [endif]_WaitSet:存放调用wait方法时 线程进入条件等待队列。
[if !supportLists]· [endif]_EntryList:存放等待锁而被阻塞的线程队列。
进入synchronized执行enter方法
void ATTR ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD ;
void * cur ;
//用cas方法设置ObjectMonitor中的_owner为当前线程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
return ;
}
if (cur == Self) {
//可重入
_recursions ++ ;
return ;
}
//如果当前线程是第一次进入该monitor,设置_recursions为1,_owner为当前线程
if (Self->is_lock_owned ((address)cur)) {
_recursions = 1 ;
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
}
调用object.wait方法实现
oid ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
//.获得Object的管程对象
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
//3.调用monitor的wait方法
monitor->wait(millis, true, THREAD);
}
//在wait方法中调用addWaiter方法
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
if (_WaitSet == NULL) {
//_WaitSet为null,就初始化_waitSet
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
//否则就尾插
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
//执行ObjectMonitor::exit释放锁
}
调用object notify方法实现
void ObjectSynchronizer::notify(Handle obj, TRAPS) {
//调用ObjectSynchronizer::inflate方法
ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
}
//通过inflate方法得到ObjectMonitor对象
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is inva;lid");
return inf
}
}
//调用ObjectMonitor的notify方法
void ObjectMonitor::notify(TRAPS) {
//调用DequeueWaiter方法移出_waiterSet第一个结点
ObjectWaiter * iterator = DequeueWaiter() ;
//后面省略是将上面DequeueWaiter尾插入_EntrySet的操作