[Java并发编程] 用 信号量(Semaphore) 实现一个消息池(含代码)

欲穷千里目,更上一层楼。—唐·王之涣《登颧雀楼》
这句诗的意思是:想看到更远更广阔的景物,你就要再上一层楼。想学到更多更深的知识,你就要比原来更努力。

PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我最大的动力!谢谢啦~

Semaphore,计数信号量,用来控制同时访问某个特定资源的线程数量,需要我们设定它的最大访问数量。 Semaphore 管理着一组虚拟许可,许可的初始数量可以通过构造函数来指定。在执行操作时可以首先获取许可,并在使用后释放许可。如果没有许可,那么获取操作将阻塞直到有可用的许可。

Semaphore 可以用于实现一个资源池,也可以将任何一个容器变成一个有界的阻塞容器,他在限制资源访问量上有很大的用处。

Semaphore 的核心方法

首先,我们先来看它的两个构造函数。

/**
 * Creates a {@code Semaphore} with the given number of
 * permits and nonfair fairness setting.
 */
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
/**
 * Creates a {@code Semaphore} with the given number of
 * permits and the given fairness setting.
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

参数 permits 表示许可数量,即同时允许多少个线程访问。参数 fair 表示公平性,即等待越久越先获取到许可。

其次,再来看一下它获取和释放许可的方法,信号量的核心用法就是下面这些。

//获取一个许可
public void acquire() throws InterruptedException {  }

//获取permits个许可
public void acquire(int permits) throws InterruptedException { }

   //释放一个许可
public void release() { }    
     
//释放permits个许可
public void release(int permits) { }    

//尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire() { };    

//尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false  
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  

   //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits) { };

//尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; 

使用场景

前面说过,Semaphore 可以用于实现一个资源池。所以,我们用它来实现一个固定数量的消息池,只允许固定数量的线程同时访问。

这个例子中消息池的数量为 3 个,信号量的许可数量也设置为 3 个,即用 Semaphore 来控制最多同时只能有三个线程使用,其中消息可以循环使用。

如果已有三个线程已经获取到了消息,那么其他线程获取消息的时候将会阻塞,直到有线程释放消息,它才能获取到。获取消息和释放消息通过 Semaphore 的 acqure() 和 release() 方法进行控制,其中 Semaphore 的许可数量不应大于消息池的最大数量。

import java.util.concurrent.Semaphore;

public class SemaphoreTest{

//表示消息池可用消息只有5个
private static final int MAX_POOL_SIZE = 3;

//获取消息的客户端的线程数量
private static final int CLIENT_SIZE = 6;   

//消息数组,存放所有消息
private static Message[] messages = new Message[MAX_POOL_SIZE];   

//信号量,许可数量为消息的最大可用数量
private static Semaphore semaphore = new Semaphore(MAX_POOL_SIZE);

//初始化消息数组
static void  init() {
    for(int i = 0; i < MAX_POOL_SIZE; i++) {
        messages[i] = new Message();
    }
}

//同步方法,获取可用的消息
static synchronized Message obtain() {
    Message msg = null;
    for(int i = 0; i < MAX_POOL_SIZE; i++) {
        if(messages[i].getFlag() == false) {
            msg = messages[i];
            msg.setId(i);
            msg.setFlag(true);
            return msg;
        }
    }
    return msg;
}

//同步方法,把用完的消息放回消息池
static synchronized boolean release(Message msg) {
    if(msg.getFlag() == true) {
        msg.setFlag(false);
        msg.setId(-1);
        return true;
    }
    return false;
}

//用信号量控制能获取消息的数目
static Message obtainMsg() throws InterruptedException {
    semaphore.acquire();
    return obtain();
}

//成功释放消息的同时释放信号量
static void releaseMsg(Message msg) {
    System.out.print(Thread.currentThread().getName() + " ***Release msg id*** = "+ msg.getId() + "\n");
    if(release(msg)) {
        semaphore.release();
    }
}

public static void main(String[] args) {
    
    //初始化
    init();
    
    //创建子线程,获取消息
    for(int i = 0; i < CLIENT_SIZE; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //获取消息
                    Message msg = obtainMsg();
                    System.out.print(Thread.currentThread().getName() + " Obtain msg id = "+ msg.getId() + "\n");
                    //假装耗时操作
                    Thread.sleep(1000);
                    //释放消息
                    releaseMsg(msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }}).start();
    }
}

//声明一个消息类
static class Message {
    private int id;         //表示每个消息的id
    private boolean flag;   //表示消息是否可用

    public Message() {
        this.id = -1;
        this.flag = false;
    }
    
    public void setId(int id) {
        this.id = id;
    }
    
    public void setFlag(boolean b) {
        this.flag = b;
    }
    
    public int getId() {
        return this.id;
    }
    
    public boolean getFlag() {
        return this.flag;
    }
}

}

执行结果:


这里写图片描述

从这个结果看,线程 1,线程 2,线程 0 先获取到消息;接着线程 1 和 2 释放消息;释放消息后,那么此时消息池又有两个空闲消息,所以,线程 3 和线程 5 获取了消息;

紧接着线程 0 释放消息,线程 4 立马获取了消息。。。

这程序的执行结果和我们预期的流程一样。需要注意的点,Semaphore 是线程安全的。在这个例子中,不可能同时有 4 个线程能同时获取到消息。

注意

既然 Semaphore 是线程安全的,为什么上面两个方法需要添加同步?

static synchronized boolean release(Message msg)
static synchronized Message obtain()

这里我们不能混淆概念,Semaphore 的线程安全是指同时只能有三个线程进入,即 acquire() 和 release() 必定线程安全。然而获取到许可后的操作不保证线程安全,所以这里加同步锁是为了确保获取消息的过程是安全的。

另外一点需要注意,为什么下面两个方法不需要使用同步锁?

static void releaseMsg(Message msg)
static Message obtainMsg() throws InterruptedException

细心的朋友可能已经知道,这里加上同步的话,会产生死锁。假如此时 acquire() 发生阻塞,那么obtainMsg() 一直持有同步锁,而 releaseMsg() 的时候必须等待同步锁的释放,这时必定陷入死锁,一直死等,然而没什么软用。
这里不需要加同步锁,是因为我们要确保安全的内容是 获取许可集 后的数据安全,和释放许可集之前的数据安全。

本文完结,如果觉得有帮助,请关注我哦,谢谢啦~

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容

  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,813评论 1 19
  • 推荐本人正在使用的高效管理软件,只做介绍,具体功能自行探索。 首先,要断定一个软件值得使用,就要看软件有没有以下几...
    鱼塘up阅读 3,175评论 1 1
  • 互加计划给我的礼物!迫不及待地要分享! 还有半个小时,无论有多么的不舍,2017终将还是要离我们而去,2018,我...
    贵阳修文陈海燕48397阅读 157评论 0 0
  • 1 无效的平均数 “去掉一个最高分,一个最低分,最终平均分是X”是不是听着很耳熟?可惜生活的规则没有比赛那么单一,...
    朱桃子阅读 218评论 0 2
  • 一周读一本书,并随笔记录,是假期我的计划之一,今天是读这本书的最后一天。 掩卷后其实要说的面很多,每一个面我也能一...
    Odelia阅读 298评论 0 0