Hadoop实战:使用Maxwell把MySQL数据实时同步到HDFS

Maxwell介绍

Maxwell是一个守护程序,一个应用程序,能够读取MySQL Binlogs然后解析输出为json。支持数据输出到Kafka中,支持表和库过滤。

→ Reference:http://maxwells-daemon.io

→ Download: https://github.com/zendesk/maxwell/releases/download/v1.10.3/maxwell-1.10.3.tar.gz

→ Source: https://github.com/zendesk/maxwell

配置MySQL->Maxwell->Kafka->Flume->HDFS

1)MySQL配置要求

配置要求

[mysqld]

server-id=1

log-bin=master

binlog_format=row

binlog_row_image=FULL

权限要求

GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'maxwell';

GRANT ALL on maxwell.* to 'maxwell'@'localhost' identified by 'maxwell';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'localhost';

2)安装配置Kafka

确认已安装java运行环境,直接解压Kafka即可使用。

$ tar xvf kafka_2.10-0.10.2.1.tgz -C /usr/local/elk

解压后,编辑配置文件:

$ cat /usr/local/elk/kafka_2.10-0.10.2.1/config/server.properties    

############################# Server Basics #############################

broker.id=0

delete.topic.enable=true


############################# Socket Server Settings #############################

listeners=PLAINTEXT://0.0.0.0:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600


############################# Log Basics #############################

log.dirs=/tmp/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1


############################# Log Flush Policy #############################

log.flush.interval.messages=10000

log.flush.interval.ms=1000


############################# Log Retention Policy #############################

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000


############################# Zookeeper #############################

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=6000

kafka需要依赖zookeeper,所以需要先启动zookeeper。

$ nohup /usr/local/elk/kafka_2.10-0.10.2.1/bin/zookeeper-server-start.sh /usr/local/elk/kafka_2.10-0.10.2.1/config/zookeeper.properties &

启动Kafka Server:(指定JMX_PORT端口,可以通过Kafka-manager获取统计信息)

$ nohup /usr/local/elk/kafka_2.10-0.10.2.1/bin/kafka-server-start.sh /usr/local/elk/kafka_2.10-0.10.2.1/config/server.properties &

3)安装配置Flume

去Apache官网下载Flume二进制安装包,然后解压即可。

tar xvf apache-flume-1.7.0-bin.tar.gz -C /usr/local/

ln -sv /usr/local/apache-flume-1.7.0-bin/ /usr/local/flume

设置环境变量

$ cat /etc/profile.d/flume.sh

export FLUME_HOME=/usr/local/flume

export FLUME_CONF_DIR=$FLUME_HOME/conf

export PATH=$PATH:$FLUME_HOME/bin

查看Flume版本

$ flume-ng version

Flume 1.7.0

Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git

Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707

Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016

创建配置文件和环境变量

$ cp -fr /usr/local/flume/conf/flume-conf.properties.template /usr/local/flume/conf/flume.conf

$ cp -fr /usr/local/flume/conf/flume-env.sh.template /usr/local/flume/conf/flume-env.sh

如果上面的JAVA_HOME设置好了,这里其实不需要设置flume-env.sh,也可以选择配置。

$ cat /usr/local/flume/conf/flume-env.sh

# Enviroment variables can be set here.

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-3.b12.el7_3.x86_64/jre


# Give Flume more memory and pre-allocate, enable remote monitoring via JMX

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"


# as it may result in logging sensitive user information or encryption secrets.

export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "


# Note that the Flume conf directory is always included in the classpath.

#FLUME_CLASSPATH=""

4)安装配置Maxwell

Maxwell存储在MySQL服务器本身所需要的所有状态,在schema_database选项指定的数据库中。默认情况下, 数据库被命名为maxwell。

$ cd /usr/local/maxwell/;./bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --port='3306' --producer=stdout

MySQL创造点数据

mysql> create database hadoop charset utf8;

Query OK, 1 row affected (0.02 sec)


mysql> use hadoop;

Database changed

mysql> create table test(id int,name varchar(10),address varchar(20));

Query OK, 0 rows affected (0.00 sec)


mysql> insert into test values(1,'dkey','ShangHai');

Query OK, 1 row affected (0.01 sec)

然后可以看到Maxwell的输出信息:

04:16:48,341 INFO OpenReplicator - starting replication at mysql-bin.000004:6777

04:18:18,654 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000004:136974], lastHeartbeat=1497601097500]

after applying "create database hadoop charset utf8" to hadoop, new schema id is 2

04:20:24,430 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000004:255163], lastHeartbeat=1497601224355]

after applying "create table test(id int,name varchar(10),address varchar(20))" to hadoop, new schema id is 3

{"database":"hadoop","table":"test","type":"insert","ts":1497601280,"xid":929,"commit":true,"data":{"id":1,"name":"dkey","address":"ShangHai"}}

5)数据输出到HDFS

Kafka创建topic

$ /usr/local/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic maxwell --partitions 20 --replication-factor 1

查看主题

$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper=127.0.0.1:2181 maxwell

查看主题详情

$ /usr/local/kafka/bin/kafka-topics.sh --zookeeper=127.0.0.1:2181 --describe --topic maxwell

Topic:maxwell PartitionCount:1 ReplicationFactor:1 Configs:

Topic: maxwell Partition: 0 Leader: 0 Replicas: 0 Isr: 0

提供一份Flume配置文件(从Kafka收集日志到HDFS)

$ cat /usr/local/flume/conf/mysql-flume-hdfs.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.zookeeperConnect = 127.0.0.1:2181

a1.sources.r1.topic = maxwell

a1.sources.r1.groupId = flume

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.kafka.consumer.timeout.ms = 100


# Describe the sink

a1.sinks.k1.type = hdfs

#a1.sinks.k1.hdfs.path = /mysql/%{topic}/%y-%m-%d

a1.sinks.k1.hdfs.path = hdfs://10.10.0.186:8020/mysql/%{topic}/%y-%m-%d

a1.sinks.k1.hdfs.rollInterval = 5

a1.sinks.k1.hdfs.rollSize = 0

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.channel = c1


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 1000


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动Flume

$ nohup flume-ng agent --conf /usr/local/flume/conf --conf-file /usr/local/flume/conf/mysql-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,console &

如果启动Flume时报错:ERROR – org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:146)] Failed to start agent because dependencies were not found in classpath. Error follows.java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType.

可能是因为你的Flume是独立部署,需要依赖Hadoop HDFS的jar包,解决方法也很简单,就是在Flume主机上解压好Hadoop的二进制安装包,然后输出Hadoop环境变量即可,Flume会根据环境变量自动找到相关的依赖jar包。具体可以看:Hadoop实战:Flume输入日志到HDFS报错解决

另外,当Flume-ng正常运行后,写入HDFS时报错:java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=root, access=WRITE, inode=”/”:hadoop:supergroup:drwxr-xr-x.

这个提示很明显,就是没有写入权限(因为你当前运行flume-ng的用户不是Hadoop用户),解决方案也很简单,就是切换到Hadoop用户执行flume-ng命令即可。或者开启HDFS允许所有用户进行文件写入,默认可能你没有开启。具体可以看:Hadoop实战:Flume输入日志到HDFS报错解决

启动Maxwell

$ cd /usr/local/maxwell/;./bin/maxwell \

--user='maxwell' \

--password='maxwell' \

--host='127.0.0.1' \

--port='3306' \

--producer=kafka \

--kafka.bootstrap.servers=127.0.0.1:9092

测试MySQL->Maxwell->Kafka->Flume->HDFS

相关组件现在都已经跑通了,接下来就是测试了,我们在MySQL插入一条数据:

mysql> insert into hadoop.test values(5,'dkey5','Shanghai');

Query OK, 1 row affected (0.00 sec)

查看Kafka队列

$ /usr/local/kafka/bin/kafka-console-consumer.sh -zookeeper=127.0.0.1:2181 --from-beginning --topic maxwell

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

{"database":"hadoop","table":"test","type":"insert","ts":1497607783,"xid":2414,"commit":true,"data":{"id":5,"name":"dkey5","address":"Shanghai"}}

然后去HDFS查看:

[hadoop@hadoop-nn ~]$ hdfs dfs -ls /mysql/maxwell/17-06-19/

Found 1 items

-rw-r--r--   3 hadoop supergroup        148 2017-06-19 03:57 /mysql/maxwell/17-06-19/FlumeData.1497859019506

[hadoop@hadoop-nn ~]$ hdfs dfs -cat /mysql/maxwell/17-06-19/FlumeData.1497859019506

{"database":"hadoop","table":"test","type":"insert","ts":1497859014,"xid":372064,"commit":true,"data":{"id":5,"name":"dkey5","address":"Shanghai"}}

会自动创建相关目录,并生成一个文件。

总结

整个MySQL->Maxwell->Flume->HDFS流程算是跑通了,但是此时也仅限于玩一玩而已,包括Flume和Kakfa都得深入学习一下。另外,我们可以看到写入HDFS的数据时json的,可能还需要提取只需要的数据,另外,对于update或delete操作目前还不知道要怎么处理。生产使用难度很大。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容