本篇文章描述了如何让ceph的bucket notification功能结合rabbitmq组件跑起来的流程。
数据存储在存储服务中如何让上层业务进行感知。在海量数据存储的情况下,靠应用去查找、比对显然低效甚至不现实。
Ceph的对象存储提供了桶通知的功能(Bucket Notification),其他一些存储比如MinIO、各家公有云产品上也有类似的功能。该功能在人工智能/机器学习、边缘计算领域工作流上有重要的应用,比如利用桶的通知功能去触发Serverless Function服务(AWS的Lambda),成为工作流上重要的一环。典型的例子图片识别:
应用抓取图片 → 上传至桶 → 桶产生事件信息 → 传到HTTP endpoint → 触发Web应用 → 图像处理算法 → 获取桶中图片信息 → 推理结果。
产生的消息可以推送(push模式)到ceph之外的组件,如http服务、支持AMQP0.9.1协议的消息队列组件或者Kafka上(目前来看MinIO兼容的endpoint服务、协议更多,包括ElasticSearch、Redis、MQTT等)。
Ceph对象存储还提供了一种拉取(pull)的模式,即pubsub同步模块,利用zone之间的同步机制。在使用时需要新建一个zone,pubsub的zone持久化事件至集群,事件将会持久化至特殊的桶(即subscription)。同时,相应地也会产生zone之间同步的负载。
一、基础组件
1、存储集群(先前搭建)。大前提是有一个ceph集群并且提供对象存储服务:)
2、消息队列Rabbitmq服务(文中介绍)
3、消费者程序(文中介绍)
安装rabbitmq
在CentOS 8上面安装。先安装包含Rabbitmq-server的软件源
yum install centos-release-rabbitmq-38.noarch
再安装rabbitmq-server服务。
yum install rabbitmq-server
启动服务
安装完成之后启动rabbitmq-server服务。两种方式任选:
1、直接启动
rabbitmq-server
2、使用systemctl启动
systemctl start rabbitmq-server
完成服务启动之后,可以查看rabbitmq的状态
rabbitmqctl status
页面服务
rabbitmq提供了页面服务。需要先开启相应的插件。执行命令如下:
rabbitmq-plugins enable rabbitmq_management
重启rabbitmq服务之后,在浏览器中输入网址。10.101.17.13为rabbitmq所在服务器的IP地址,15672为服务绑定的端口。
http://10.101.17.13:15672
并使用默认账号登录,用户名/密码如下:
guest / guest
二、 配置ceph集群
以下操作将在Ceph集群上完成。
- 1、确保Bucket Notification操作API开启;
- 2、创建Topic;
- 3、创建桶通知配置。
官网说明也有描述,但是操作性不强,需要参考一些邮件列表的信息才能获得整个过程的完整面貌。
确定rgw服务开启
在rgw服务上查看Bucket Notification的API是否开启。开启API才能进行后续的操作。查看方式使用ceph daemon查看对应的字段。
ceph daemon /var/run/ceph/ceph-client.rgw.XXXXX.asok config show | grep rgw_enable_apis
返回结果里面有notifications字段即可。
创建topic
操作bucket notification API可以用Postman或者直接shell的curl命令发送HTTP请求,也可以通过工具发送。awscli是用python boto3库实现的管理工具。以下操作基于awscli。安装awscli工具:
yum install awscli
使用前需要对用户、密钥等对象存储访问基础信息进行配置。
aws configure
awscli的一些案例可以在awscli案例中找到,很有帮助。
数字签名上的设置,否则会返回签名因为不同版本差异会报有误。
aws configure set default.sns.signature_version s3
创建topic
aws --region=default --endpoint-url http://10.101.17.45:7999 sns create-topic --name=mytopic --attributes='{"push-endpoint": "amqp://10.101.17.13:5672", "amqp-exchange": "ex1", "amqp-ack-level": "broker"}'
其中10.101.17.45:7999是rgw对外提供的服务地址,mytopic为新创建topic的名称。amqp://10.101.17.13:5672是rabbitmq的IP和端口,这里如果不带user和passwd默认是guest/guest,vhost默认是/。ex1为rabbitmq上exchange的名称。
返回结果表明创建成功
{
"TopicArn": "arn:aws:sns:default::mytopic"
}
该结果的固定格式如下:
arn:aws:sns:<zone-group>:<tenant>:<topic>
查看集群topic方式
radosgw-admin topic list
查看topic的配置
aws --region=default --endpoint-url http://10.101.17.45:7999 sns get-topic-attributes --topic-arn="arn:aws:sns:default::mytopic"
创建通知配置
有了topic之后需要指定对应的桶通知哪些内容。需要指定一个已经存在的桶(例子中是testwxc),设定桶通知的配置,如ID、对应的topic、事件和信息过滤器。
aws --region=default --endpoint-url http://10.101.17.45:7999 s3api put-bucket-notification-configuration --bucket=testwxc --notification-configuration='{"TopicConfigurations": [{"Id": "notif1", "TopicArn": "arn:aws:sns:default::mytopic", "Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"], "Filter": {"Key": {"FilterRules": [{"Name": "regex", "Value": "([a-z]+)"}]}}}]}'
查看已经成功设置的内容
aws --region=default --endpoint-url http://10.101.17.45:7999 s3api get-bucket-notification-configuration --bucket=testwxc
三、事件获取
完成RGW服务侧的配置之后,使用go语言写一个接收程序(消费者)获取推送的通知信息。需要使用到amqp的库。获取该库命令:
go get github/streadway/amqp
使用go get获取慢的情况,可以参考这篇文章:解决go get下载慢的方法,亲测有效。
代码完成流程:
- 1、连接rabbitmq-server;
- 2、获得Channel;
- 3、定义获取Exchange、Queue;
- 4、将队列和Routing key绑定;
- 5、消费Queue中的内容。
package main
import (
"log"
"github.com/streadway/amqp"
)
func quitOnError(err error) {
if err != nil {
log.Fatal(err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@10.101.17.13:5672/")
quitOnError(err)
defer conn.Close()
ch, err := conn.Channel()
quitOnError(err)
defer ch.Close()
err = ch.ExchangeDeclare(
"ex1", // exchange的名称
"topic", // exchange的类型
true, // 是否持久化(durable)
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
quitOnError(err)
queue, err := ch.QueueDeclare(
"", // 队列名称,默认不填写
true, //是否持久化
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
quitOnError(err)
routingKey := "mytopic"
err = ch.QueueBind(
queue.Name, // 队列名称
routingKey, // 指定routing key
"ex1", // 指定exchange的名称
false,
nil,
)
quitOnError(err)
msgs, err := ch.Consume(
queue.Name, // 队列名称
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
quitOnError(err)
run := make(chan bool)
go func() {
for item := range msgs {
log.Printf(" %s", item.Body)
}
}()
log.Printf("Waiting for messages. Press CTRL+C to quit\n")
<-run
}
运行之后通过s3cmd上传对象,可以获取到对象创建的消息。
四、过程中遇到的问题
1、没有创建exchange
即便有操作,但是没有获得通知信息。查看ceph rgw服务日志看到如下字段
connection (10.101.17.13:5672/) retry failed. error: RGW_AMQP_STATUS_VERIFY_EXCHANGE_FAILED
原因:ceph rgw服务本身并不创建exchange需要在rabbitmq上进行创建。需要在rabbitmq页面服务exchange的tab页中进行创建。如下图所示:
创建之后报错会消失。
2、消息没有被路由
查看ceph rgw服务日志看到如下字段
AMQP run: message was not routable
AMQP run: ignoring non n/ack messages. frame type: 2
AMQP run: ignoring non n/ack messages. frame type: 3
查看了一下代码src/rgw/rgw_amqp.cc
。
注释表明该消息被返回给发送者。
原因:ceph rgw产生的消息到达exchange之后,并不知道消息应该推到哪个queue,即表示不能routable。rabbitmq要通过routingkey将exchange的信息分发到queue中。
通过rabbitmq官方教程中的图就能更为清楚的说明消息队列的结构。其中routes根据routkeying发送到队列中。
但是,routing key是什么?这也是我花了很久没有走通的地方。回过头来看官方文档已经说明了这一点。
翻译过来“create topic使用的名称是exchange向queue分发消息时的routing key”。
五、 其他常用的操作
关闭rabbitmq-server
关闭rabbitmq的命令
rabbitmqctl stop
删除已经创建的topic
使用awscli
aws --region=default --endpoint-url http://10.101.17.45:7999 sns delete-topic --topic-arn "arn:aws:sns:default::mytopic"
该操作需要aws配置的用户拥有删除的能力(即op_mask拥有delete)。
或者通过radosgw-admin的命令:
radosgw-admin topic rm --topic mytopic
查看是否消息被正常发送
通过ceph daemon可以查看消息发送成功和失败的统计,即pubsub_push_ok和pubsub_push_failed等参数,这些参数Bucket Notification是和PubSub模块合用。
六、写在最后
官方文档使用时候可能缺少一些步骤,造成功能无法跑起来。这个时候无比信服“便宜的才是最贵的”这句话。开源软件看上去功能都有,好不好用是另外一回事儿。能跑起来也需要花点时间。
附在邮件列表中能找到最为有用的讨论:S3 Bucket Notification requirement
另外,还有ceph Bucket Notification作者的功能介绍PubSub and Notifications in Ceph