如何利用.NETCore向Azure EventHubs批量发送数据?

最近在做一个基于Azure云的物联网分析项目:


image

.netcore采集程序向Azure事件中心(EventHubs)发送数据,通过Azure EventHubs Capture转储到Azure BlogStorage,供数据科学团队分析。

为什么使用Azure事件中心?

Azure事件中心是一种Azure上完全托管的实时数据摄取服务, 每秒可流式传输来自website、app、device任何源的数百万个事件。提供的统一流式处理平台和时间保留缓冲区,将事件生成者和事件使用者分开。

  • 事件生成者: 可使用https、AQMP协议发布事件
  • 分区:事件中心通过分区使用者模式提供消息流式处理功能,提高可用性和并行化
  • 事件接收者:所有事件中心使用者通过AMQP 1.0会话进行连接,读取数据


    image

例如,如果事件中心具有四个分区,并且其中一个分区要在负载均衡操作中从一台服务器移动到另一台服务器,则仍可以通过其他三个分区进行发送和接收。 此外,具有更多分区可以让更多并发读取器处理数据,从而提高聚合吞吐量。 了解分布式系统中分区和排序的意义是解决方案设计的重要方面。 为了帮助说明排序与可用性之间的权衡,请参阅 CAP 定理

最直观的方式:请在portal.azure.cn门户站点---->创建事件中心命名空间---> 创建事件中心


image

.NetCore 准实时批量发送数据到事件中心

.NET库 (Azure.Messaging.EventHubs)

我们使用Asp.NetCore以Azure App Service形式部署,依赖Azure App Service的自动缩放能录应对物联网的潮汐大流量。

通常推荐批量发送到事件中心,能有效增加web服务的吞吐量和响应能力。
目前新版SDk: Azure.Messaging.EventHubs仅支持分批发送。

  1. nuget上引入Azure.Messaging.EventHubs库
  2. EventHubProducerClient客户端负责分批发送数据到事件中心,根据发送时指定的选项,事件数据可能会自动路由到可用分区或发送到特定请求的分区。

在以下情况下,建议允许自动路由分区:
1) 事件的发送必须高度可用
2) 事件数据应在所有可用分区之间平均分配。
自动路由分区的规则:
1)使用循环法将事件平均分配到所有可用分区中
2)如果某个分区不可用,事件中心将自动检测到该分区并将消息转发到另一个可用分区。

我们要注意,根据选定的 命令空间定价层, 每批次发给事件中心的最大消息大小也不一样:


image

分段批量发送策略

这里我们就需要思考: web程序收集数据是以个数为单位; 但是我们分批发送时要根据分批的字节大小来切分。
我的方案是: 因引入TPL Dataflow 管道:

image

  1. web程序收到数据,立刻丢入TransformBlock<string, EventData>
  2. 转换到EventData之后,使用BatchBlock<EventData>按照个数打包
  3. 利用ActionBlock<EventData[]>在包内 累积指定字节大小批量发送
  • 最后我们设置一个定时器(5min),强制在BatchBlock的前置队列未满时打包,并发送。

核心的TPL Dataflow代码如下:

public class MsgBatchSender
    {
        private readonly EventHubProducerClient Client;
        private readonly TransformBlock<string, EventData> _transformBlock;
        private readonly BatchBlock<EventData> _packer;
        private readonly ActionBlock<EventData[]> _batchSender;

        private readonly DataflowOption _dataflowOption;
        private readonly Timer _trigger;
        private readonly ILogger _logger;

        public MsgBatchSender(EventHubProducerClient client, IOptions<DataflowOption> option,ILoggerFactory loggerFactory)
        {
            Client = client;
            _dataflowOption = option.Value;
            var dfLinkoption = new DataflowLinkOptions { PropagateCompletion = true };

            _transformBlock = new TransformBlock<string, EventData>(
                text => new EventData(Encoding.UTF8.GetBytes(text)),
                   new ExecutionDataflowBlockOptions
                   {
                       MaxDegreeOfParallelism = _dataflowOption.MaxDegreeOfParallelism
                   });
            _packer = new BatchBlock<EventData>(_dataflowOption.BatchSize);
            _batchSender = new ActionBlock<EventData[]>(msgs=> BatchSendAsync(msgs));
            _packer.LinkTo(_batchSender, dfLinkoption);

            _transformBlock.LinkTo(_packer, dfLinkoption, x => x != null);

            _trigger = new Timer(_ => _packer.TriggerBatch(), null, TimeSpan.Zero, TimeSpan.FromSeconds(_dataflowOption.TriggerInterval));

            _logger = loggerFactory.CreateLogger<DataTrackerMiddleware>();
        }

        private async Task BatchSendAsync(EventData[] msgs)
        {
            try
            {
                if (msgs != null)
                {
                    var i = 0;
                    while (i < msgs.Length)
                    {
                        var batch = await Client.CreateBatchAsync();
                        while (i < msgs.Length)
                        {
                            if (batch.TryAdd(msgs[i++]) == false)
                            {
                                break;
                            }
                        }
                        if(batch!= null && batch.Count>0)
                        {
                            await Client.SendAsync(batch);
                            batch.Dispose();
                        }
                    }
                }
            }
             catch (Exception ex)
            {
                // ignore and log any exception
                _logger.LogError(ex, "SendEventsAsync: {error}", ex.Message);
            }

        }

        public  async Task<bool> PostMsgsync(string txt)
        {
            return await _transformBlock.SendAsync(txt);
        }

        public async Task CompleteAsync()
        {
            _transformBlock.Complete();
            await _transformBlock.Completion;
            await _batchSender.Completion;
            await _batchSender.Completion;
        }
    }
image

低频的时候 定时器起作用,队列未满强制打包发送。

这样在高频的时候是接近实时,低频的时候最大延迟5分钟发送。

总结

  • Azure事件中心的基础用法
  • .NET Core准实时分批向Azure事件中心发送数据,其中用到的TPL Dataflow是以actor模型:提供了粗粒度的数据流和流水线任务,提高了高并发程序的健壮性。

源码地址:https://github.com/zaozaoniao/SaicEnergyTracker

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容