ceph对象存储Bucket Notification功能使用

本篇文章描述了如何让ceph的bucket notification功能结合rabbitmq组件跑起来的流程。

数据存储在存储服务中如何让上层业务进行感知。在海量数据存储的情况下,靠应用去查找、比对显然低效甚至不现实。

Ceph的对象存储提供了桶通知的功能(Bucket Notification),其他一些存储比如MinIO、各家公有云产品上也有类似的功能。该功能在人工智能/机器学习、边缘计算领域工作流上有重要的应用,比如利用桶的通知功能去触发Serverless Function服务(AWS的Lambda),成为工作流上重要的一环。典型的例子图片识别:
应用抓取图片 → 上传至桶 → 桶产生事件信息 → 传到HTTP endpoint → 触发Web应用 → 图像处理算法 → 获取桶中图片信息 → 推理结果。

产生的消息可以推送(push模式)到ceph之外的组件,如http服务、支持AMQP0.9.1协议的消息队列组件或者Kafka上(目前来看MinIO兼容的endpoint服务、协议更多,包括ElasticSearch、Redis、MQTT等)。

Ceph对象存储还提供了一种拉取(pull)的模式,即pubsub同步模块,利用zone之间的同步机制。在使用时需要新建一个zone,pubsub的zone持久化事件至集群,事件将会持久化至特殊的桶(即subscription)。同时,相应地也会产生zone之间同步的负载。

一、基础组件

1、存储集群(先前搭建)。大前提是有一个ceph集群并且提供对象存储服务:)
2、消息队列Rabbitmq服务(文中介绍)
3、消费者程序(文中介绍)

安装rabbitmq

在CentOS 8上面安装。先安装包含Rabbitmq-server的软件源

yum install centos-release-rabbitmq-38.noarch

再安装rabbitmq-server服务。

yum install rabbitmq-server

启动服务

安装完成之后启动rabbitmq-server服务。两种方式任选:
1、直接启动

rabbitmq-server

2、使用systemctl启动

systemctl start rabbitmq-server

完成服务启动之后,可以查看rabbitmq的状态

rabbitmqctl status

页面服务

rabbitmq提供了页面服务。需要先开启相应的插件。执行命令如下:

rabbitmq-plugins enable rabbitmq_management

重启rabbitmq服务之后,在浏览器中输入网址。10.101.17.13为rabbitmq所在服务器的IP地址,15672为服务绑定的端口。

http://10.101.17.13:15672

并使用默认账号登录,用户名/密码如下:

guest / guest

二、 配置ceph集群

以下操作将在Ceph集群上完成。

  • 1、确保Bucket Notification操作API开启;
  • 2、创建Topic;
  • 3、创建桶通知配置。
    官网说明也有描述,但是操作性不强,需要参考一些邮件列表的信息才能获得整个过程的完整面貌。

确定rgw服务开启

在rgw服务上查看Bucket Notification的API是否开启。开启API才能进行后续的操作。查看方式使用ceph daemon查看对应的字段。

ceph daemon /var/run/ceph/ceph-client.rgw.XXXXX.asok config show | grep rgw_enable_apis

返回结果里面有notifications字段即可。

创建topic

操作bucket notification API可以用Postman或者直接shell的curl命令发送HTTP请求,也可以通过工具发送。awscli是用python boto3库实现的管理工具。以下操作基于awscli。安装awscli工具:

yum install awscli

使用前需要对用户、密钥等对象存储访问基础信息进行配置。

aws configure 

awscli的一些案例可以在awscli案例中找到,很有帮助。
数字签名上的设置,否则会返回签名因为不同版本差异会报有误。

aws configure set default.sns.signature_version s3

创建topic

aws --region=default --endpoint-url http://10.101.17.45:7999 sns create-topic --name=mytopic --attributes='{"push-endpoint": "amqp://10.101.17.13:5672", "amqp-exchange": "ex1", "amqp-ack-level": "broker"}'

其中10.101.17.45:7999是rgw对外提供的服务地址,mytopic为新创建topic的名称。amqp://10.101.17.13:5672是rabbitmq的IP和端口,这里如果不带user和passwd默认是guest/guest,vhost默认是/。ex1为rabbitmq上exchange的名称。

返回结果表明创建成功

{
    "TopicArn": "arn:aws:sns:default::mytopic"
}

该结果的固定格式如下:

arn:aws:sns:<zone-group>:<tenant>:<topic>

查看集群topic方式

radosgw-admin topic list

查看topic的配置

aws --region=default --endpoint-url http://10.101.17.45:7999 sns get-topic-attributes --topic-arn="arn:aws:sns:default::mytopic"

创建通知配置

有了topic之后需要指定对应的桶通知哪些内容。需要指定一个已经存在的桶(例子中是testwxc),设定桶通知的配置,如ID、对应的topic、事件和信息过滤器。

aws --region=default --endpoint-url http://10.101.17.45:7999 s3api put-bucket-notification-configuration --bucket=testwxc --notification-configuration='{"TopicConfigurations": [{"Id": "notif1", "TopicArn": "arn:aws:sns:default::mytopic", "Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"], "Filter": {"Key": {"FilterRules": [{"Name": "regex", "Value": "([a-z]+)"}]}}}]}'

查看已经成功设置的内容

aws --region=default --endpoint-url http://10.101.17.45:7999  s3api get-bucket-notification-configuration --bucket=testwxc 

三、事件获取

完成RGW服务侧的配置之后,使用go语言写一个接收程序(消费者)获取推送的通知信息。需要使用到amqp的库。获取该库命令:

go get github/streadway/amqp

使用go get获取慢的情况,可以参考这篇文章:解决go get下载慢的方法,亲测有效。
代码完成流程:

  • 1、连接rabbitmq-server;
  • 2、获得Channel;
  • 3、定义获取Exchange、Queue;
  • 4、将队列和Routing key绑定;
  • 5、消费Queue中的内容。
package main

import (
    "log"
    "github.com/streadway/amqp"
)

func quitOnError(err error) {
        if err != nil {
                log.Fatal(err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@10.101.17.13:5672/")
        quitOnError(err)
        defer conn.Close()

        ch, err := conn.Channel()
        quitOnError(err)
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "ex1",   // exchange的名称
                "topic", // exchange的类型
                true,     // 是否持久化(durable)
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        quitOnError(err)

        queue, err := ch.QueueDeclare(
                "",    // 队列名称,默认不填写
                true,  //是否持久化
                false, // delete when usused
                true,  // exclusive
                false, // no-wait
                nil,   // arguments
        )
        quitOnError(err)

        routingKey := "mytopic"

        err = ch.QueueBind(
                queue.Name,   // 队列名称
                routingKey,   // 指定routing key
                "ex1",        //  指定exchange的名称
                false,
                nil,
        )
        quitOnError(err)

        msgs, err := ch.Consume(
                queue.Name, // 队列名称
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        quitOnError(err)

        run := make(chan bool)

        go func() {
                for item := range msgs {
                        log.Printf(" %s", item.Body)
                }
        }()

        log.Printf("Waiting for messages.  Press CTRL+C to quit\n")
        <-run
}

运行之后通过s3cmd上传对象,可以获取到对象创建的消息。

四、过程中遇到的问题

1、没有创建exchange

即便有操作,但是没有获得通知信息。查看ceph rgw服务日志看到如下字段

connection (10.101.17.13:5672/) retry failed. error: RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED

原因:ceph rgw服务本身并不创建exchange需要在rabbitmq上进行创建。需要在rabbitmq页面服务exchange的tab页中进行创建。如下图所示:

rabbitmq页面上创建exchange

创建之后报错会消失。

2、消息没有被路由

查看ceph rgw服务日志看到如下字段

AMQP run: message was not routable
AMQP run: ignoring non n/ack messages. frame type: 2
AMQP run: ignoring non n/ack messages. frame type: 3

查看了一下代码src/rgw/rgw_amqp.cc

日志所关联的代码

注释表明该消息被返回给发送者。

原因:ceph rgw产生的消息到达exchange之后,并不知道消息应该推到哪个queue,即表示不能routable。rabbitmq要通过routingkey将exchange的信息分发到queue中。

通过rabbitmq官方教程中的图就能更为清楚的说明消息队列的结构。其中routes根据routkeying发送到队列中。

rabbitmq消息队列模型

但是,routing key是什么?这也是我花了很久没有走通的地方。回过头来看官方文档已经说明了这一点。

routing key是什么

翻译过来“create topic使用的名称是exchange向queue分发消息时的routing key”。

五、 其他常用的操作

关闭rabbitmq-server

关闭rabbitmq的命令

rabbitmqctl stop

删除已经创建的topic

使用awscli

aws --region=default --endpoint-url http://10.101.17.45:7999 sns delete-topic --topic-arn "arn:aws:sns:default::mytopic"

该操作需要aws配置的用户拥有删除的能力(即op_mask拥有delete)。

或者通过radosgw-admin的命令:

radosgw-admin topic rm  --topic mytopic

查看是否消息被正常发送

通过ceph daemon可以查看消息发送成功和失败的统计,即pubsub_push_okpubsub_push_failed等参数,这些参数Bucket Notification是和PubSub模块合用。

六、写在最后

官方文档使用时候可能缺少一些步骤,造成功能无法跑起来。这个时候无比信服“便宜的才是最贵的”这句话。开源软件看上去功能都有,好不好用是另外一回事儿。能跑起来也需要花点时间。

在邮件列表中能找到最为有用的讨论:S3 Bucket Notification requirement

另外,还有ceph Bucket Notification作者的功能介绍PubSub and Notifications in Ceph

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容