avro是什么这里不赘述了。不懂的同学请翻阅资料。
kafka c++客户端的安装请参考我的另外一篇《kafka c++客户端安装指南》
为了方便,kafka c++客户端直接用了cppkafka。
安装avro c++
下载地址 https://avro.apache.org/releases.html
我安装的版本是1.8.3,大家自行参考
安装目录在lang/c++下,使用cmake编译。编译完成后会生成
动态库和静态库
libavrocpp_s.a
libavrocpp.so.1.8.3-SNAPSHOT.0
avro c++使用
参考官方教程 http://avro.apache.org/docs/1.8.0/api/cpp/html/
主要的使用代码demo,在lang/c++/samples下都有。参考着看。
和cppkafka配合使用需要注意的地方
avro encode的时候,一定要注意编码的转换。下面是参考示例。
// 自定义结构,对应的avro json未request.json。
// 内容省略
request req;
auto out = avro::memoryOutputStream(); // 获取输出流
avro::EncoderPtr e = avro::binaryEncoder(); // 编码器
e->init(*out); // 用输出流初始化编码器
avro::encode(*e, req); // 将req内容编码
// 编码后的输出流需要转输入流,再转给cppkafka的Buffer对象接收
auto in = avro::memoryInputStream(*out);
// 注意avro中底层储存字节都是unsigned char,而不是常见的string中的char!
// 所以先用unsigned char的stringbuf接收流中的内容
basic_stringbuf<unsigned char> strbuf;
basic_ostream<unsigned char> os(&strbuf);
const unsigned char *h = NULL;
const unsigned char *p = NULL;
size_t total=0;
size_t n = 0;
while (in->next(&p, &n)) {
os.write(p, n);
if(total == 0) {
h = p;
}
total += n;
}
// 初始化kafka broker list
cppkafka::Configuration config = {
{ "metadata.broker.list", "192.168.2.90:9092" }
};
// 这个是个绕人的地方,初始化cppkafka buffer的时候不可以使用stringbuf的str函数。会导致编码错乱。
cppkafka::Buffer buf(p, n);
cppkafka::MessageBuilder builder("test");
builder.payload(buf);
cppkafka::Producer producer(config);
producer.produce(builder);
producer.flush();