using ChatMgr.Logger;
using ChatMgr.Model;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ChatMgr.Biz
{
/// <summary>
/// 负责将历史消息批量入库及分表的逻辑
/// </summary>
public class Processor
{
public void Run()
{
DateTime now = DateTime.Now;
while (true)
{
//1.0 开启定时器每个5秒钟访问历史队列中的数据
if ((DateTime.Now - now).TotalSeconds > 5)
{
try
{
//2.0 从历史队列中获取消息数据
var historyMsgList = HistoryMsgManager.GetList();
LogHelper.WriteLog("历史记录总条数=" + historyMsgList.Count);
if (historyMsgList.Any())
{
//3.0 判断当前ChatMsg表中的数据是否超出了设定的临界值 5条
//3.0.1 获取当前使用表的名称
CrmChat14Entities db = new CrmChat14Entities();
string currentTableName = db.RouteMsgTable.ToList().LastOrDefault().TableName;
//3.0.2 获取currentTableName表中的总条数据
int rowCount = db.Database.SqlQuery<int>("select count(1) from " + currentTableName).FirstOrDefault();
LogHelper.WriteLog("当前表名称=" + currentTableName + " 中的总条数=" + rowCount);
//3.0.3 判断总条数是否超出了临界值5
if (rowCount > 5)
{
//3.0.4 如果超出了则重新建立一个物理表来存放数据
//1 创建一个新的物理表,2 将新的物理表名称 在 RouteMsgTable中管理起来 3 将RouteMsgTable的前一条记录的EndTime更新
//开启分布式事务来调用Usp_CreateNewTable20即可
int newTableIndex = int.Parse(currentTableName.Replace("ChatMsg", "")) + 1;
string procFmt = "Usp_CreateNewTable20 {0},'{1}'";
using (System.Transactions.TransactionScope scop = new System.Transactions.TransactionScope())
{
db.Database.ExecuteSqlCommand(string.Format(procFmt, newTableIndex, historyMsgList.FirstOrDefault().SendTime));
scop.Complete();
}
//向新表中插入数据
WriteToServer(historyMsgList, "ChagMsg" + newTableIndex);
}
else
{
//3.0.5 如果没有超出临界值则继续向这个表插入数据
WriteToServer(historyMsgList, currentTableName);
}
}
//重置计时器
now = DateTime.Now;
}
catch (Exception ex)
{
//使用Log4Net.dll将日志记录下来,方便查看系统的运行
LogHelper.WriteErrorLog(ex.ToString(), ex);
}
}
//切记注意:让线程休眠一定(>20ms)的时间,让CPU有足够的切换时间,防止CPU执行到达100%,降低系统性能
System.Threading.Thread.Sleep(1000);
}
}
/// <summary>
/// 利用SqlBulkCopy将数据批量入库
/// </summary>
/// <param name="tableName">ChatMsg0-N</param>
void WriteToServer(List<ChatMsg> list, string tableName)
{
DataTable dt = new DataTable();
//向dt中添加列
dt.Columns.Add("ToUserId", typeof(int));
dt.Columns.Add("ToRealName", typeof(string));
dt.Columns.Add("FromUserId", typeof(int));
dt.Columns.Add("FromRealName", typeof(string));
dt.Columns.Add("MessageBody", typeof(string));
dt.Columns.Add("SendTime", typeof(DateTime));
//将list中的数据转换成dt
DataRow row;
foreach (var item in list)
{
//按照dt的表结构实例化一个datarow
row = dt.NewRow();
row["ToUserId"] = item.ToUserId;
row["ToRealName"] = item.ToRealName;
row["FromUserId"] = item.FromUserId;
row["FromRealName"] = item.FromRealName;
row["MessageBody"] = item.MessageBody;
row["SendTime"] = item.SendTime;
dt.Rows.Add(row);
}
string connString = System.Configuration.ConfigurationManager.ConnectionStrings["sqlbulkcopyconnstring"].ConnectionString;
//1.0 将list集合中的数据转换到datatable中
using (System.Data.SqlClient.SqlBulkCopy copy = new System.Data.SqlClient.SqlBulkCopy(connString))
{
//指定将数据写入到哪个表中
copy.DestinationTableName = tableName;
//将内存表中的列名称与物理表中的列名称一一映射
copy.ColumnMappings.Add("ToUserId", "ToUserId");
copy.ColumnMappings.Add("ToRealName", "ToRealName");
copy.ColumnMappings.Add("FromUserId", "FromUserId");
copy.ColumnMappings.Add("FromRealName", "FromRealName");
copy.ColumnMappings.Add("MessageBody", "MessageBody");
copy.ColumnMappings.Add("SendTime", "SendTime");
//批量插入数据
copy.WriteToServer(dt);
}
}
}
}