基于Flink的实时告警实现(2):读取数据源

本专题将会从0到1实现告警处理流程,并会讲解实现过程中使用到的Flink中的技术。

1 Flink的工作方式

使用Flink常用的方式是将Flink作为管道和管道之间的处理器,Flink从源中读取数据,进行逻辑计算后,将结果写入到目的,这里的源和目的可以是同一类系统,例如,都是kafka。Flink内置的和扩展的Connectors:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/

Source -> Transform -> Sink

2 Flink中的无界流数据源

Flink的API对于数据处理来说可以分为两大类:DataStream和DataSet。DataStream用于处理无界流,例如,socket、kafka,DataSet用于处理有界流,例如,对象集合、文件。

2.1 socket

socket数据源可以处理通过套接字传输来的数据:

socketTextStream(String hostname, int port);
socketTextStream(String hostname, int port, String delimiter);
socketTextStream(String hostname, int port, String delimiter, long maxRetry);

hostname是套接字的主机名/IP,port是套接字的端口,delimiter是数据之间的分隔符,默认是"\n",maxRetry是最大等待时间,当读取的套接字关闭后,会尝试重连多少次,每隔一秒尝试一次,默认值是0,如果是0表明不重连,如果小于0表明一直重连。

DEMO如下:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Job {

    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建socket流
        DataStream<String> dataStream = env.socketTextStream("localhost", 8888, "\n");
        // 打印流
        dataStream.print();
    }
}

将上面的程序编译打包生成jar包后上传到服务器。这里需要连接socket,先使用nc工具监听8888端口:nc -l 8888,然后提交flink任务。

当在nc里面输入内容后,flink的程序就可以拿到数据。

2.2 Kafka

Kafka应该是跟Flink最匹配的搭档,由于采集数据量巨大,很多系统都会将数据采集后写入Kafka,然后一个程序从kafka读取数据后落地到后端存储,另一个程序通过Flink从Kafka中读,进行逻辑处理。

第一步,添加kafka依赖,这里要注意flink和kafka的版本。

<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.version}</artifactId>
        <version>${flink.version}</version>
 </dependency>

其中,flink.version为flink的版本,scala.version为scala的版本。

第二步,构建kafka的配置

// 定义kafka的配置
// bootstrap.servers:kafka brokers的地址,以逗号分割
// group.id:消费者组的ID
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka_brokers_list");
kafkaProps.setProperty("group.id", "group_id_of_consumer");

第三步,读取数据,进行解析,由于存储到kafka的数据是二进制的,需要进行反序列化,而客户端写入kafka中的数据最常用的方式就是JSON数据,因此,我们重点讨论如何将kafka中的JSON数据解析成便于flink处理的数据。

flink提供了JSONKeyValueDeserializationSchema,该类可以将kafka中的数据转换为ObjectNode,其实就相当于是个Map。假设kafka中的数据是:{"name": "luo", "age": 17, "phone": { "home": 123, "work": 456}}, 然后可以通过ObjectNode.get("value").get("name")获取到名字luo,通过Object.get("value").get("phone").get("home")就可以获取到家里的电话123。

flink中方便处理的数据类型有两种:Tuple和对象。因此,将数据用JSONKeyValueDeserializationSchema反序列化,再用map操作将数据转换成Tuple或者对象。

将数据转换成对象的实现方式:

// 传递JSONKeyValueDeserializationSchema的对象,该对象有一个参数
// 表明是否携带kafka的一些元数据
DataStreamSource<ObjectNode> kafkaStream =
        env.addSource(new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(false),
                kafkaProps));

// 在map的处理函数里面从ObjectNode对象中获取对应的字段,然后生成对象
kafkaStream.map(new MapFunction<ObjectNode, SwitchMetric>() {
    @Override
    public SwitchMetric map(ObjectNode jsonNodes) throws Exception {
        String name = jsonNodes.get("value").get("name").textValue();
        String addr = jsonNodes.get("value").get("addr").textValue();
        return new People(name, addr);
    }
});

将数据转换成Tuple的实现方式:

DataStreamSource<ObjectNode> kafkaStream =
        env.addSource(new FlinkKafkaConsumer<>("topic", new JSONKeyValueDeserializationSchema(false),
                kafkaProps));

kafkaStream.map(new MapFunction<ObjectNode, Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> map(ObjectNode jsonNodes) throws Exception {
        String name = jsonNodes.get("value").get("name").textValue();
        String addr = jsonNodes.get("value").get("addr").textValue();
        return Tuple2.of(name, addr);
    }
});

除了上面的先使用JSONKeyValueDeserializationSchema反序列化,再使用map操作转换成合适的类型之外,还可以使用自定义的DeserializationSchema对数据进行处理:

public static class MySchema implements KafkaDeserializationSchema<Tuple2<String, String>> {

    private final ObjectMapper mapper = new ObjectMapper();

    @Override
    public Tuple5<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        // 反序列化的主函数:先将数据转换成Map,然后在返回时构造新的Tuple
        // 当然,也可以在这里直接将json字符串解析成对象,不过就需要引入新的处理json的包
        Map<String,Object> t = mapper.readValue(record.value(), Map.class);
        
       return new Tuple2.of(String.valueOf(t.get("name")), String.valueOf(t.get("addr")));
    }

    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
}

3 自定义数据源

flink支持自定义数据源,数据源就是可以源源不断地返回数据(对于流而言),因此,数据源可以说就是一个死循环,这个死循环会持续不断地输出数据。如下例所示,MySource实现了SourceFunction接口,里面有两个方法需要实现:

DataStreamSource<People> stream = env.addSource(new MySource());
stream.print();

public static class MySource implements SourceFunction<People> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<People> sourceContext) {
        while(isRunning) {
            sourceContext.collect(new People("name", "home"));
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }
}

4 小结

本文介绍了如何从数据源中读取数据,然后转换成对象或者元组以方便后续的处理,然后介绍了自定义数据源并创建了一个简单的数据源。

5 参考文档

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,753评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,668评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,090评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,010评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,054评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,806评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,484评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,380评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,873评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,021评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,158评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,838评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,499评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,044评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,159评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,449评论 3 374
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,136评论 2 356

推荐阅读更多精彩内容