.NetCore利用BlockingCollection实现简易消息队列

消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。
我们用BlockingCollection来实现简单的消息队列。

实现消息队列

用Vs2017创建一个控制台应用程序。创建DemoQueueBlock类,封装一些常用判断。

HasEle,判断是否有元素

Add向队列中添加元素

Take从队列中取出元素

为了不把BlockingCollection直接暴漏给使用者,我们封装一个DemoQueueBlock

    /// <summary>
    /// BlockingCollection演示消息队列
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class DemoQueueBlock<T> where T : class
    {
        private static BlockingCollection<T> Colls;
        public DemoQueueBlock()
        {

        }
        public static bool IsComleted() {
            if (Colls != null && Colls.IsCompleted) {
                return true;
            }
            return false;
        }
        public static bool HasEle()
        {
            if (Colls != null && Colls.Count>0)
            {
                return true;
            }
            return false;
        }
       
        public static bool Add(T msg)
        {
            if (Colls == null)
            {
                Colls = new BlockingCollection<T>();
            }
            Colls.Add(msg);
            return true;
        }
        public static T Take()
        {
            if (Colls == null)
            {
                Colls = new BlockingCollection<T>();
            }
            return Colls.Take();
        }
    }

    /// <summary>
    /// 消息体
    /// </summary>
    public class DemoMessage
    {
        public string BusinessType { get; set; }
        public string BusinessId { get; set; }
        public string Body { get; set; }
    }

添加元素进队列

通过控制台,添加元素

           //添加元素
            while (true)
            {
                Console.WriteLine("请输入队列");
                var read = Console.ReadLine();
                if (read == "exit")
                {
                    return;
                }

                DemoQueueBlock<DemoMessage>.Add(new DemoMessage() { BusinessId = read });
            }

消费队列

通过判断IsComleted,来确定是否获取队列

  Task.Factory.StartNew(() =>
            {
                //从队列中取元素。
                while (!DemoQueueBlock<DemoMessage>.IsComleted())
                {
                    try
                    {
                        var m = DemoQueueBlock<DemoMessage>.Take();
                      Console.WriteLine("已消费:" + m.BusinessId);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }
            });

查看运行结果

运行结果

这样我们就实现了简易的消息队列。

示例源码

简易队列

参考链接

BlockingCollection
Orleans源码分析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 1、通过CocoaPods安装项目名称项目信息 AFNetworking网络请求组件 FMDB本地数据库组件 SD...
    阳明AI阅读 16,228评论 3 119
  • 上一年接触到的东西很多,总体上团队稳定,工作还算成功;也遇到很多得力的帮手。整体上个人成长与收获不小,但是略有虚浮...
    赵峰阅读 327评论 0 1
  • stand for代表,支持 particular (about/over sth)讲究,挑剔 attend to...
    augest阅读 143评论 0 0
  • 我的一堂伯父是逮鱼摸虾的高手,特别是捉黄鳝,更有绝活。别人都捉不着黄鳝,惟独他能捉到,而且捉到的黄鳝个头还大!但...
    小风徐徐阅读 290评论 3 3
  • 不管是过去的封建社会还是现代飞速发展的社会,这都是一个弱肉强食,仰望强者的世道。只是古代社会和现代社会对于强者的定...
    紫苏种花生阅读 812评论 0 0

友情链接更多精彩内容