RabbitMQ六种工作模式.NET代码封装

Git 地址 https://gitee.com/delaywu/neter.git
查看具体用法

1 创建RabbitMq连接数据

 /// <summary>
    /// 默认获取 Neter:RabbitMq 节点
    /// </summary>
    public class RabbitMqOptions
    {
        public RabbitMqConnectorOptions Connector { get; set; }
        public List<RabbitMqHostOptions> Host { get; set; }
    }

    public class RabbitMqConnectorOptions
    {
        /// <summary>
        /// 用户名
        /// </summary>
        public string UserName { get; set; }

        /// <summary>
        /// 登录密码
        /// </summary>
        public string Password { get; set; }

        /// <summary>
        /// 虚拟主机
        /// 使用 虚拟主机 缓存连接对象
        /// </summary>
        public string VirtualHost { get; set; }
    }

    /// <summary>
    /// 集群 
    /// </summary>
    public class RabbitMqHostOptions
    {
        public string Host { get; set; }
        public int Port { get; set; } 
    }

2 定义 RabbitMq 队列操作信息

  public class RabbitMqQueue
    {
        public static RabbitMqQueue Configure(Action<RabbitMqQueue> config)
        {
            var mg = new RabbitMqQueue(); 
            config?.Invoke(mg); 
            return mg;
        }

        /// <summary>
        /// 队列名称 个路由名称 通过;分割
        /// </summary>
        public string Name { get; set; } 


        /// <summary>
        /// 路由key 多个路由KEY 通过;分割
        /// </summary>
        public string RouteKey { get; set; }

        /// <summary>
        /// 是否持久化
        /// </summary>
        public bool Durable { get; set; } 

        /// <summary>
        /// 绑定参数
        /// </summary>
        public IDictionary<string, object> Args { get; set; }

        /// <summary>
        /// 路由交换机
        /// </summary>
        public string Exchange { get; set; }

        /// <summary>
        /// 路由交换机 类型
        /// </summary>
        public string ExchangeType { get; set; }
    }

3 通过工厂创建RabbitMq链接对象

  public class RabbitMqConnectorFactory
    {
        public static IConnection GetConnector(RabbitMqOptions options)
        {
            if (options == null)
                throw new ArgumentNullException("请先配置RabbitMq连接参数.");

            if (options.Connector == null)
                throw new ArgumentNullException("配置的RabbitMq连接对象不能为空.");

            if (options.Host == null || options.Host?.Count==0)
                throw new ArgumentNullException("请至少配置一个RabbitMq主机连接对象.");

            return options.Host.Count == 1 ? 
                    GetConnection(options) : 
                    GetClusterConnection(options);
        }

        /// <summary>
        /// 获取单个RabbitMQ连接
        /// </summary>
        /// <returns></returns>
        public static IConnection GetConnection(RabbitMqOptions options)
        {
            var host = options.Host.FirstOrDefault();
            var factory = new ConnectionFactory
            {
                HostName = host.Host, //ip
                Port = host.Port, // 端口
                UserName = options.Connector.UserName, // 账户
                Password = options.Connector.Password, // 密码
                VirtualHost = options.Connector.VirtualHost   // 虚拟主机
            };

            return factory.CreateConnection();
        }

        /// <summary>
        /// 获取集群连接对象
        /// </summary>
        /// <returns></returns>
        public static IConnection GetClusterConnection(RabbitMqOptions options)
        {
            var factory = new ConnectionFactory
            {
                UserName = options.Connector.UserName, // 账户
                Password = options.Connector.Password, // 密码
                VirtualHost = options.Connector.VirtualHost   // 虚拟主机
            };
            var list = options.Host.Select(s => new AmqpTcpEndpoint
            {
                HostName = s.Host,
                Port = s.Port,
            });

            return factory.CreateConnection(list.ToList());
        }
    }

4 组装 声明RabbitMq 基本信息

队列、绑定、交换机声明

public class RabbitMqBuilder
    {
        /// <summary>
        /// 交换机或路由定义
        /// </summary>
        public static event Func<RabbitMqQueue, IModel> ExchangeWithQueueDeclare;

        public static IModel Build(IModel channel, RabbitMqQueue mg)
        {
            ExchangeWithQueueDeclare += channel.WithExchange;
            ExchangeWithQueueDeclare += channel.WithQueueDeclare;
            ExchangeWithQueueDeclare += channel.WithQueueBind;

            return ExchangeWithQueueDeclare.Invoke(mg);
        }
    }

    public static class IModelExtension
    {
        public static IModel WithExchange(this IModel channel, RabbitMqQueue mg)
        {
            if (!mg.Exchange.IsNullOrEmpty() && !mg.ExchangeType.IsNullOrEmpty())
                channel.ExchangeDeclare(mg.Exchange, mg.ExchangeType);

            return channel;
        }

        public static IModel WithQueueDeclare(this IModel channel, RabbitMqQueue mg)
        {
            var queues = mg.Name.Split(",");
            foreach (var name in queues)
            {
                channel.QueueDeclare(name, false, false, false, mg.Args);
            }
            return channel;
        }

        public static IModel WithQueueBind(this IModel channel, RabbitMqQueue mg)
        {
            if (!mg.Exchange.IsNullOrEmpty())
            {
                var queues = mg.Name.Split(";");
                var routes = mg.RouteKey.Split(";");

                var isNeedRouteKey = mg.ExchangeType == ExchangeType.Direct || mg.Exchange == ExchangeType.Topic;
                if (isNeedRouteKey && queues.Length != routes.Length)
                {
                    throw new ArgumentException("路由数量和路由键数量不相等.");
                }

                for (int i = 0, len = queues.Length; i < len; i++)
                {
                    if (isNeedRouteKey)
                    {
                        channel.QueueBind(queue: queues[i], exchange: mg.Exchange, routingKey: routes[i]);
                    }
                    else
                    {
                        channel.QueueBind(queue: queues[i], exchange: mg.Exchange, routingKey: "");
                    }
                } 
            }
            return channel;
        }
    }

5 核心操作RabbitMq类

发送消息和接受消息

 public class RabbitMqCore
    {
        private static readonly IDictionary<string, IConnection> ConnectionCache = new Dictionary<string, IConnection>();
        private readonly SemaphoreSlim _connectLock = new SemaphoreSlim(1, 1);

        private volatile IConnection _connection;

        public RabbitMqQueue RabbitMqueue { get; set; }

        public RabbitMqCore() 
        {
            _connection = Connect();
        }


        public RabbitMqCore(Action<RabbitMqQueue> config)
        {
            RabbitMqueue = new RabbitMqQueue();
            config?.Invoke(RabbitMqueue); 
            _connection = Connect();
        }

        /// <summary>
        /// 生产端 -- 每次使用释放对象
        /// </summary>
        /// <param name="mg">队列名称</param>
        /// <param name="action">执行动作</param>
        public void AMQP(Action<IModel,RabbitMqQueue> action)
        {
            Check.NotNull(action, nameof(action));

            using (var channel = RabbitMqBuilder.Build(_connection.CreateModel(), this.RabbitMqueue))
            {
                action?.Invoke(channel,this.RabbitMqueue);
            }
        }

        /// <summary>
        /// 消费端--不释放对象
        /// </summary> 
        /// <param name="action">执行动作</param>
        public void AMQPC(Action<IModel,RabbitMqQueue> action)
        { 
            Check.NotNull(action, nameof(action));

            var channel = RabbitMqBuilder.Build(_connection.CreateModel(), this.RabbitMqueue);
            {
                action?.Invoke(channel,this.RabbitMqueue);
            }
        }

        #region 私有方法
        public IConnection Connect()
        {
            var options = GetOptions();
            if (options == null || options.Connector == null)
                throw new ArgumentNullException("请先配置RabbitMq连接参数");

            _connectLock.Wait();
            try
            {
                if (ConnectionCache.TryGetValue(options.Connector.VirtualHost, out IConnection connection))
                {
                    return connection;
                }

                connection = RabbitMqConnectorFactory.GetConnector(options);
                ConnectionCache[options.Connector.VirtualHost] = connection;
                return connection;
            }
            finally
            {
                _connectLock.Release();
            }
        }

        private RabbitMqOptions GetOptions()
        {
            RabbitMqOptions options = new RabbitMqOptions();
            return AppSettingsReader.GetInstance("Neter:RabbitMq", options);
        }
        #endregion
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容