C# 简单操作 RabbitMQ 发送与接收队列消息

一、前言

为了解藕下发指令功能,加入了 RabbitMQ 中间件,目的很简单,就是使用独立出来的消息中间件,使得两端的应用互不影响,你重启你的,我发送我的。

根据此应用场景,对组件功能的需求也比较简单。考虑到类似的简单场景也具备一定的通用性,于是进行简易封装。

期望的外部方法主要有:

  • 指定队列名称发送消息
  • 消息到达可指定处理方法

这里我们使用默认的 exchange,也即不涉及这个概念。

引用程序集,如:rabbitmq-dotnet-client-3.6.6-dotnet-4.5.zip

https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/

二、先看使用方式

为了简洁,这里在同一个控制台上完成同一队列的发送与接收,手动输入发送内容,接收显示发送的内容。

(1)形成通话对象

参数如其名,将相应的信息作为参数传递即可。

RabbitMQQueueTalker rmt = new RabbitMQQueueTalker(
        host: "localhost",      
        username: "guest", 
        password: "guest", 
        sendQueueName: "QueueTalk", 
        receiveQueueName: "QueueTalk", 
        durable: false);

(2)绑定消息响应方法

// 消息接收响应
rmt.OnMessage(s => {
    Console.WriteLine(String.Format("receive message : {0}", s));
}); 

(3)发送消息

rmt.SendMessage(message);

完整调用过程

class Program
{
    static void Main(string[] args)
    {
        RabbitMQQueueTalker rmt = new RabbitMQQueueTalker(
            host: "localhost", 
            username: "guest", 
            password: "guest", 
            sendQueueName: "QueueTalk", 
            receiveQueueName: "QueueTalk", 
            durable: false);

        // 消息接收响应
        rmt.OnMessage(s => {
            Console.WriteLine(String.Format("receive message : {0}", s));
        }); 

        // 输入并发送消息
        while (true)
        {
            // input message
            string message = Console.ReadLine();
            if (rmt.SendMessage(message))
            {
                Console.WriteLine("send message :{0}", message);
            }
        }
    }
}

运行结果如下:

三、封装实现

3.1 启动连接

应用需要连接能够断线重连,这里定义 GetConnection 方法,实现在未建立时新建连接,建立之后则由断线重连属性 AutomaticRecoveryEnabled 完成断线之后的恢复。

private IConnection GetConnection()
{
    if (_Connection != null) return _Connection;

    var factory = new ConnectionFactory()
    {
        HostName = this.Host,
        UserName = this.UserName,
        Password = this.Password,
        RequestedHeartbeat = 10,
        AutomaticRecoveryEnabled = true
    };

    try
    {
        factory.RequestedConnectionTimeout = 6000;
        _Connection = factory.CreateConnection();

        // 阻塞解除之后检测接收通道是否还打开
        _Connection.ConnectionUnblocked += (o, e) => {
            BuildReceiveChannel();
        };

        return _Connection;
    }
    catch (Exception se)
    {
        Debug.WriteLine(se.Message);
        Debug.WriteLine(se.StackTrace);
        return null;
    }
}

3.2 发送通道与发送方法

默认队列,在通道建立的过程中进行队列声明 QueueDeclare

// 获取发送通道
private IModel GetSendChannel()
{
    if (_SendChannel != null && !_SendChannel.IsClosed) return _SendChannel;

    var conn = GetConnection();
    if (conn == null) return null;

    try
    {
        _SendChannel = conn.CreateModel();

        _SendChannel.QueueDeclare(queue: SendQueueName,
                            durable: Durable,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

        return _SendChannel;
    }
    catch (Exception se)
    {
        Debug.WriteLine(se.Message);
        Debug.WriteLine(se.StackTrace);
        return null;
    }
}

对于临时指定队列名称的,则在方法中动态进行队列声明 QueueDeclare。开放的方法如下。

/// <summary>
/// 向默认发送队列发送消息
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public bool SendMessage(string message)
{
    return SendMessage(SendQueueName, message);
}

/// <summary>
/// 向指定队列发送消息
/// </summary>
/// <param name="queueName"></param>
/// <param name="message"></param>
/// <returns></returns>
public bool SendMessage(string queueName, string message)
{
    var channel = GetSendChannel();
    if (channel == null) return false;

    try
    {
        if (SendQueueName != queueName)
        {
            channel.QueueDeclare(queue: queueName,
                            durable: Durable,   
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);
        }

        var body = Encoding.UTF8.GetBytes(message);

        // 发送消息
        channel.BasicPublish(exchange: "",
                                routingKey: queueName,
                                basicProperties: null,
                                body: body);
        return true;
    }
    catch (Exception se)
    {
        Debug.WriteLine(se.Message);
        Debug.WriteLine(se.StackTrace);
        return false;
    }
}

3.3 接收通道与接收方式

定义响应方法列表,用于在消息事件中逐个调用。

private List<Action<string>> ReceiveActionList = new List<Action<string>>();

建立接收通道,并绑定接收事件。

// 建立接收通道
private IModel BuildReceiveChannel()
{
    if (_ReceiveChannel != null && !_ReceiveChannel.IsClosed) return _ReceiveChannel;

    var conn = GetConnection();
    if (conn == null) return null;

    try
    {
        _ReceiveChannel = conn.CreateModel();

        _ReceiveChannel.QueueDeclare(queue: ReceiveQueueName,
                        durable: Durable,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

        var consumer = new EventingBasicConsumer(_ReceiveChannel);

        // 绑定消息事件
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body);
            foreach (Action<string> action in ReceiveActionList)
            {
                action(message);
            }                    
        };
        // 启动消费者
        _ReceiveChannel.BasicConsume(queue: ReceiveQueueName,
                                noAck: true,
                                consumer: consumer);

        return _ReceiveChannel;
    }
    catch (Exception se)
    {
        Debug.WriteLine(se.Message);
        Debug.WriteLine(se.StackTrace);
        return null;
    }
}

向外开放添加响应方法的功能。

/// <summary>
/// 添加消息到达响应方法
/// </summary>
/// <param name="action"></param>
public void OnMessage(Action<string> action)
{
    lock (ReceiveActionList)
    {
        ReceiveActionList.Add(action);
    }

    BuildReceiveChannel();
}

详细源码

https://github.com/triplestudio/helloworld/blob/master/RabbitMQTest/RabbitMQQueueTalker.cs

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352