Git 地址 https://gitee.com/delaywu/neter.git
查看具体用法
1 创建RabbitMq连接数据
/// <summary>
/// 默认获取 Neter:RabbitMq 节点
/// </summary>
public class RabbitMqOptions
{
public RabbitMqConnectorOptions Connector { get; set; }
public List<RabbitMqHostOptions> Host { get; set; }
}
public class RabbitMqConnectorOptions
{
/// <summary>
/// 用户名
/// </summary>
public string UserName { get; set; }
/// <summary>
/// 登录密码
/// </summary>
public string Password { get; set; }
/// <summary>
/// 虚拟主机
/// 使用 虚拟主机 缓存连接对象
/// </summary>
public string VirtualHost { get; set; }
}
/// <summary>
/// 集群
/// </summary>
public class RabbitMqHostOptions
{
public string Host { get; set; }
public int Port { get; set; }
}
2 定义 RabbitMq 队列操作信息
public class RabbitMqQueue
{
public static RabbitMqQueue Configure(Action<RabbitMqQueue> config)
{
var mg = new RabbitMqQueue();
config?.Invoke(mg);
return mg;
}
/// <summary>
/// 队列名称 个路由名称 通过;分割
/// </summary>
public string Name { get; set; }
/// <summary>
/// 路由key 多个路由KEY 通过;分割
/// </summary>
public string RouteKey { get; set; }
/// <summary>
/// 是否持久化
/// </summary>
public bool Durable { get; set; }
/// <summary>
/// 绑定参数
/// </summary>
public IDictionary<string, object> Args { get; set; }
/// <summary>
/// 路由交换机
/// </summary>
public string Exchange { get; set; }
/// <summary>
/// 路由交换机 类型
/// </summary>
public string ExchangeType { get; set; }
}
3 通过工厂创建RabbitMq链接对象
public class RabbitMqConnectorFactory
{
public static IConnection GetConnector(RabbitMqOptions options)
{
if (options == null)
throw new ArgumentNullException("请先配置RabbitMq连接参数.");
if (options.Connector == null)
throw new ArgumentNullException("配置的RabbitMq连接对象不能为空.");
if (options.Host == null || options.Host?.Count==0)
throw new ArgumentNullException("请至少配置一个RabbitMq主机连接对象.");
return options.Host.Count == 1 ?
GetConnection(options) :
GetClusterConnection(options);
}
/// <summary>
/// 获取单个RabbitMQ连接
/// </summary>
/// <returns></returns>
public static IConnection GetConnection(RabbitMqOptions options)
{
var host = options.Host.FirstOrDefault();
var factory = new ConnectionFactory
{
HostName = host.Host, //ip
Port = host.Port, // 端口
UserName = options.Connector.UserName, // 账户
Password = options.Connector.Password, // 密码
VirtualHost = options.Connector.VirtualHost // 虚拟主机
};
return factory.CreateConnection();
}
/// <summary>
/// 获取集群连接对象
/// </summary>
/// <returns></returns>
public static IConnection GetClusterConnection(RabbitMqOptions options)
{
var factory = new ConnectionFactory
{
UserName = options.Connector.UserName, // 账户
Password = options.Connector.Password, // 密码
VirtualHost = options.Connector.VirtualHost // 虚拟主机
};
var list = options.Host.Select(s => new AmqpTcpEndpoint
{
HostName = s.Host,
Port = s.Port,
});
return factory.CreateConnection(list.ToList());
}
}
4 组装 声明RabbitMq 基本信息
队列、绑定、交换机声明
public class RabbitMqBuilder
{
/// <summary>
/// 交换机或路由定义
/// </summary>
public static event Func<RabbitMqQueue, IModel> ExchangeWithQueueDeclare;
public static IModel Build(IModel channel, RabbitMqQueue mg)
{
ExchangeWithQueueDeclare += channel.WithExchange;
ExchangeWithQueueDeclare += channel.WithQueueDeclare;
ExchangeWithQueueDeclare += channel.WithQueueBind;
return ExchangeWithQueueDeclare.Invoke(mg);
}
}
public static class IModelExtension
{
public static IModel WithExchange(this IModel channel, RabbitMqQueue mg)
{
if (!mg.Exchange.IsNullOrEmpty() && !mg.ExchangeType.IsNullOrEmpty())
channel.ExchangeDeclare(mg.Exchange, mg.ExchangeType);
return channel;
}
public static IModel WithQueueDeclare(this IModel channel, RabbitMqQueue mg)
{
var queues = mg.Name.Split(",");
foreach (var name in queues)
{
channel.QueueDeclare(name, false, false, false, mg.Args);
}
return channel;
}
public static IModel WithQueueBind(this IModel channel, RabbitMqQueue mg)
{
if (!mg.Exchange.IsNullOrEmpty())
{
var queues = mg.Name.Split(";");
var routes = mg.RouteKey.Split(";");
var isNeedRouteKey = mg.ExchangeType == ExchangeType.Direct || mg.Exchange == ExchangeType.Topic;
if (isNeedRouteKey && queues.Length != routes.Length)
{
throw new ArgumentException("路由数量和路由键数量不相等.");
}
for (int i = 0, len = queues.Length; i < len; i++)
{
if (isNeedRouteKey)
{
channel.QueueBind(queue: queues[i], exchange: mg.Exchange, routingKey: routes[i]);
}
else
{
channel.QueueBind(queue: queues[i], exchange: mg.Exchange, routingKey: "");
}
}
}
return channel;
}
}
5 核心操作RabbitMq类
发送消息和接受消息
public class RabbitMqCore
{
private static readonly IDictionary<string, IConnection> ConnectionCache = new Dictionary<string, IConnection>();
private readonly SemaphoreSlim _connectLock = new SemaphoreSlim(1, 1);
private volatile IConnection _connection;
public RabbitMqQueue RabbitMqueue { get; set; }
public RabbitMqCore()
{
_connection = Connect();
}
public RabbitMqCore(Action<RabbitMqQueue> config)
{
RabbitMqueue = new RabbitMqQueue();
config?.Invoke(RabbitMqueue);
_connection = Connect();
}
/// <summary>
/// 生产端 -- 每次使用释放对象
/// </summary>
/// <param name="mg">队列名称</param>
/// <param name="action">执行动作</param>
public void AMQP(Action<IModel,RabbitMqQueue> action)
{
Check.NotNull(action, nameof(action));
using (var channel = RabbitMqBuilder.Build(_connection.CreateModel(), this.RabbitMqueue))
{
action?.Invoke(channel,this.RabbitMqueue);
}
}
/// <summary>
/// 消费端--不释放对象
/// </summary>
/// <param name="action">执行动作</param>
public void AMQPC(Action<IModel,RabbitMqQueue> action)
{
Check.NotNull(action, nameof(action));
var channel = RabbitMqBuilder.Build(_connection.CreateModel(), this.RabbitMqueue);
{
action?.Invoke(channel,this.RabbitMqueue);
}
}
#region 私有方法
public IConnection Connect()
{
var options = GetOptions();
if (options == null || options.Connector == null)
throw new ArgumentNullException("请先配置RabbitMq连接参数");
_connectLock.Wait();
try
{
if (ConnectionCache.TryGetValue(options.Connector.VirtualHost, out IConnection connection))
{
return connection;
}
connection = RabbitMqConnectorFactory.GetConnector(options);
ConnectionCache[options.Connector.VirtualHost] = connection;
return connection;
}
finally
{
_connectLock.Release();
}
}
private RabbitMqOptions GetOptions()
{
RabbitMqOptions options = new RabbitMqOptions();
return AppSettingsReader.GetInstance("Neter:RabbitMq", options);
}
#endregion
}