直接不废话了,安装RabbitMQ的话我会再出一个文章,这里只介绍下在.NetCore下如何进行数据的播发和接收。
其实这个程序在之前的其他项目中使用过,是正在跑的程序。环境是ubuntu下rabbitmq和.net core配合使用。其实在ubuntu下rabbitmq有文件打开数的封禁,系统默认是1024,换算下来rabbitmq客户端的最大连接数为829,如果看官有更大的连接需求的话可以修改系统的文件打开数就可以了,如果你的需求是上万的,建议还是使用rabbitmq集群的好。这里的话我们只是进行了数据的高速播发,不敢说大话,每秒50个数据,829的连接数,数据实时播发,各个客户端无延迟及丢失,这个是测试过的。
话不多说,上代码:
接收端代码如下:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
namespace Test_Hq
{
class Program
{
static string mqipaddres = "********";
static Int32 mqport = 5672;
static void Main(string[] args)
{
for(int i = 0; i < 10; i++)
{
Thread thread = new Thread(process);
thread.IsBackground = true;
thread.Start();
}
Console.ReadLine();
}
static void process()
{
ConnectionFactory factory = new ConnectionFactory() { HostName = mqipaddres, Port = mqport, UserName = "******", Password = "********" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "Stock", type: "fanout");
var result = channel.QueueDeclare(exclusive: true);
string queue_name = result.QueueName;
channel.QueueBind(queue: queue_name, exchange: "Stock", "");
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: queue_name, autoAck: true, consumer: consumer);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
这里该干嘛干嘛,如果要任务分发的话在这里重新把收到的数据委托出去处理,或者是用线程安全队列接收下,然后让工作线程进行处理,不建议在这里直接处理。
Console.WriteLine(Encoding.UTF8.GetString(body));
};
}
}
}
发送端代码如下:
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Stock_Hq
{
class Program
{
static string mqipaddres = "********";
static Int32 mqport = 5672;
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory() { HostName = mqipaddres, Port = mqport, UserName = "*******", Password = "*******" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange:"Stock",type: "fanout");
Console.CancelKeyPress += (object sender, ConsoleCancelEventArgs e) =>
{
//connection.Close();
System.Environment.Exit(0);
e.Cancel = true;
};
int i=0;
while (true)
{
i++;
Thread.Sleep(20);间隔设置,这里可以启动工作线程,从线程安全队列里面拿到所需数据,然后发送出去。
var sendbyte = Encoding.UTF8.GetBytes(i.ToString()+":"+DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:ffff"));
channel.BasicPublish(exchange: "Stock", routingKey: "", body: sendbyte);
Console.WriteLine(i.ToString() + ":" + DateTime.Now.ToString());
}
}
}
}