Flink实战:FlinkSQL接收开启Kerberos认证的Kafka集群数据存入MySQL

微信公众号:大数据开发运维架构

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

如果您觉得“大数据开发运维架构”对你有帮助,欢迎转发朋友圈

从微信公众号拷贝过来,格式有些错乱,建议直接去公众号阅读


上篇文章展示了Flink连接Kafka集群的代码,平时我们做统计分析,经常会用到FlinkSQL,这里就贴一下FlinkSQL消费Kafka数据存入Mysql的代码实例,更多实战内容关注微信公众号:“大数据开发运维架构”

版本信息:

flink1.9.0

kafka0.10.0

    mysql5.6.40

废话不多说直接上实战代码:

1.这里mysql数据库recommend中有一张表student,创建表语句:

SETNAMESutf8mb4;SETFOREIGN_KEY_CHECKS =0;-- ------------------------------ Table structure for student-- ----------------------------DROPTABLEIFEXISTS`student`;CREATETABLE`student`(`id`int(64)NULLDEFAULTNULL,`name`varchar(255)CHARACTERSETutf8COLLATEutf8_general_ciNULLDEFAULTNULL,`course`varchar(255)CHARACTERSETutf8COLLATEutf8_general_ciNULLDEFAULTNULL,`score`double(128,0)NULLDEFAULTNULL)ENGINE=InnoDBCHARACTERSET= utf8COLLATE= utf8_general_ci ROW_FORMAT =Compact;SETFOREIGN_KEY_CHECKS =1;

2.对应student表的实体类:

packagecom.hadoop.ljs.flink.sql;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-03-01 07:50*@version: v1.0*@description: com.hadoop.ljs.flink.sql */publicclassStudent{/*唯一ID*/intid;/*名字*/    String name;/*课程*/    String course;/*分数*/doublescore;publicStudent(Integer f0, String f1, String f2, Double f3){        id=f0;        name=f1;        course=f2;        score=f3;    }publicintgetId(){returnid;    }publicvoidsetId(intid){this.id = id;    }publicStringgetName(){returnname;    }publicvoidsetName(String name){this.name = name;    }publicStringgetCourse(){returncourse;    }publicvoidsetCourse(String course){this.course = course;    }publicdoublegetScore(){returnscore;    }publicvoidsetScore(doublescore){this.score = score;    }}

3.自定义Sink类,存数据到mysql中:

packagecom.hadoop.ljs.flink.sql;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;/***@author: Created By lujisen*@companyChinaUnicom Software JiNan*@date: 2020-03-01 07:48*@version: v1.0*@description: com.hadoop.ljs.flink.sql */publicclassSinkStudent2MySQLextendsRichSinkFunction{publicstaticfinalString url="jdbc:mysql://10.124.165.31:3306/recommend??useUnicode=true&characterEncoding=UTF-8";publicstaticfinalString userName="root";publicstaticfinalString password="123456a?";privatestaticfinallongserialVersionUID = -4443175430371919407L;    PreparedStatement ps;privateConnection connection;/**这里的open只调用一次*@paramparameters*@throwsException    */@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);        connection = getConnection();String sql ="replace  into student(id,name,course,score) values(?, ?, ?,?);";ps =this.connection.prepareStatement(sql);    }@Overridepublicvoidclose()throwsException{super.close();if(connection !=null) {            connection.close();        }if(ps !=null) {            ps.close();        }    }/**    * 每条数据的插入都要调用一次 invoke() 方法    **@paramcontext*@throwsException    */@Overridepublicvoidinvoke(Student student, Context context)throwsException{/*对每一条数据进行处理,组装数据*/ps.setLong(1, student.getId());ps.setString(2,student.getName());ps.setString(3, student.getCourse());ps.setDouble(4,student.getScore());        ps.executeUpdate();    }privatestaticConnectiongetConnection(){Connection con =null;try{Class.forName("com.mysql.jdbc.Driver");            con = DriverManager.getConnection(url,userName,password);System.out.println("msql连接成功!");}catch(Exception e) {System.out.println("msql连接失败,错误信息"+ e.getMessage());        }returncon;    }}

4.主函数类,从kafka接收消息,对每行数据进行拆分,注册为临时表,调用自定义SinkStudent2MySQL类,存入数据到student表中:

package com.hadoop.ljs.flink.sql;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple4;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.java.StreamTableEnvironment;importjava.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-03-01 07:47 * @version: v1.0 * @description: com.hadoop.ljs.flink.sql */publicclassFlinkKafkaKerberosSQLConsumer {publicstaticfinalStringkrb5Conf="D:\\kafkaSSL\\krb5.conf";publicstaticfinalStringkafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";publicstaticfinalStringtopic="topic2";publicstaticfinalStringconsumerGroup="test_topic2";publicstaticfinalStringbootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";publicstaticvoidmain(String[] args) throws Exception {//在windows中设置JAAS,也可以通过-D方式传入System.setProperty("java.security.krb5.conf", krb5Conf);System.setProperty("java.security.auth.login.config", kafkaJaasConf);              StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();senv.setDefaultLocalParallelism(1);        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(senv, bsSettings);        FlinkKafkaConsumer010 myConsumer =newFlinkKafkaConsumer010(topic,newSimpleStringSchema(),getComsumerProperties());DataStream stream = senv.addSource(myConsumer);stream.filter(newFilterFunction() {@Overridepublicbooleanfilter(Stringvalue) throws Exception {if(null==value||value.split(",").length!=4){returnfalse;                }returntrue;            }        });DataStream> map = stream.map(newMapFunction>() {privatestaticfinal long serialVersionUID =1471936326697828381L;@OverridepublicTuple4 map(Stringvalue) throws Exception {String[] split = value.split(",");returnnewTuple4<>(Integer.valueOf(split[0]), split[1], split[2], Double.valueOf(split[3]));            }        });//将数据注册为临时表,并制定fieldstableEnv.registerDataStream("student", map,"id,name,course,score");Table sqlQuery = tableEnv.sqlQuery("select id,name,course,score  from  student");DataStream> appendStream = tableEnv.toAppendStream(sqlQuery, Types.TUPLE(Types.INT, Types.STRING, Types.STRING,Types.DOUBLE));        appendStream.print();/*将每条数据转换成student实体类数据,sink到mysql中*/appendStream.map(newMapFunction, Student>() {privatestaticfinal long serialVersionUID =-4770965496944515917L;@OverridepublicStudent map(Tuple4 value) throws Exception {returnnewStudent(value.f0, value.f1, value.f2,value.f3);            }}).addSink(newSinkStudent2MySQL());senv.execute("FlinkKafkaKerberosSQLConsumer");    }/*获取Kafka消费端配置*/privatestaticProperties getComsumerProperties() {Properties props =newProperties();props.put("bootstrap.servers",bootstrapServer);props.put("group.id",consumerGroup);props.put("auto.offset.reset","earliest");/*keberos集群,必须制定以下三项配置*/props.put("security.protocol","SASL_PLAINTEXT");props.put("sasl.kerberos.service.name","kafka");props.put("sasl.mechanism","GSSAPI");returnprops;    }}

5.这里贴下pom.xml:

1.9.01.82.111.2.50.10.1.0org.apache.flinkflink-connector-kafka-0.10_2.11${flink.version}org.apache.flinkflink-table-planner-blink_${scala.binary.version}${flink.version}org.apache.flinkflink-table-runtime-blink_${scala.binary.version}${flink.version}org.apache.flinkflink-table-common${flink.version}mysqlmysql-connector-java5.1.46

6.发送数据到kafka,每条记录用逗号“,”拆分:

1001,name1,yuwen1,811002,name2,yuwen2,821003,name3,yuwen3,83

发送数据截图:

最近一些文章都是根据粉丝留言进行编写实战代码,如有其他需求直接给我公众号留言即可,觉得有用,多给转转朋友圈,谢谢关注!!!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,717评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,501评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,311评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,417评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,500评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,538评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,557评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,310评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,759评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,065评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,233评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,909评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,548评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,172评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,420评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,103评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,098评论 2 352

推荐阅读更多精彩内容