第1章 数据仓库概念
数据仓库( Data Warehouse ),可简写为DW或DWH。数据仓库,是为企业所有决策制定过程,提供所有系统数据支持的战略集合。
通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本、提高产品质量等。
数据仓库,并不是数据的最终目的地,而是为数据最终的目的地做好准备。这些准备包括对数据的:清洗,转义,分类,重组,合并,拆分,统计等等。
第2章 项目需求
2.1 项目需求分析
1、实时采集埋点的用户行为数据
2、实现数据仓库的分层搭建
3、每天定时导入业务数据
4、根据数据仓库中的数据进行报表分析
2.2 项目框架
2.2.1 技术选型
数据采集传输:Flume,Kafka,Logstash,DataX,Sqoop
数据存储:Hive,MySql,HDFS,HBase,S3
数据计算:Spark,Hive,Tez,Flink,Storm
数据查询:Presto,Impala,Kylin
2.2.2 常见系统架构图设计
2.2.3 常见系统数据流程设计
2.2.4 框架版本选型
软件产品 | 版本 |
---|---|
Hadoop | 2.7.2 |
Flume | 1.7.0 |
Kafka | 0.11.0.2 |
Kafka Manager | 1.3.3.22 |
Hive | 1.2.1 |
Sqoop | 1.4.6 |
Mysql | 5.7 |
Azkaban | 2.5.0 |
Java | 1.8 |
Zookeeper | 3.4.10 |
注意事项:框架选型尽量不要选择最新的框架,选择最新框架半年前左右的稳定版。
2.2.5 集群资源规划设计
服务器1 | 服务器2 | 服务器3 | |
---|---|---|---|
HDFS | NameNode,DataNode | DataNode | DataNode |
Yarn | NodeManager | Resourcemanager,NodeManager | NodeManager |
Zookeeper | Zookeeper | Zookeeper | Zookeeper |
Flume(采集日志) | Flume | Flume | |
Kafka | Kafka | Kafka | Kafka |
Flume(消费Kafka) | Flume | ||
Hive | Hive | ||
Mysql | Mysql |
第3章 数据生成模块
3.1 话单数据字段
private String sysId; // 平台编码
private String serviceName; // 接口服务名称
private String homeProvinceCode; // 归属省
private String visitProvinceCode; // 访问省
private String channelCode; // 渠道编码
private String serviceCode; // 业务流水号
private String cdrGenTime; // 开始时间
private String duration; // 时长
private String recordType; // 话单类型
private String imsi; // 46000开头15位编码
private String msisdn; // 通常指手机号码
private String dataUpLinkVolume; // 上行流量
private String dataDownLinkVolume; // 下行流量
private String charge; // 费用
private String resultCode; // 结果编码
3.2 话单数据格式:
{"cdrGenTime":"20211206165309844","channelCode":"wRFUC","charge":"62.28","dataDownLinkVolume":"2197","dataUpLinkVolume":"467","duration":"1678","homeProvinceCode":"551","imsi":"460007237312954","msisdn":"15636420864","recordType":"gprs","resultCode":"000000","serviceCode":"398814910321850","serviceName":"a9icp8xcNy","sysId":"U1ob6","visitProvinceCode":"451"}
3.3 编写项目模拟生成话单数据
详见项目:cdr_gen_app
对项目进行打包并部署到Hadoop101机器上,执行命令测试数据生成
[hadoop@hadoop101 ~]$ head datas/call.log
{"cdrGenTime":"20211206173630762","channelCode":"QEHVc","charge":"78.59","dataDownLinkVolume":"3007","dataUpLinkVolume":"3399","duration":"1545","homeProvinceCode":"571","imsi":"460002441249614","msisdn":"18981080935","recordType":"gprs","resultCode":"000000","serviceCode":"408688053609568","serviceName":"8lr23kV4C2","sysId":"j4FbV","visitProvinceCode":"931"}
{"cdrGenTime":"20211206173630884","channelCode":"TY4On","charge":"31.17","dataDownLinkVolume":"1853","dataUpLinkVolume":"2802","duration":"42","homeProvinceCode":"891","imsi":"460008474876800","msisdn":"15998955319","recordType":"gprs","resultCode":"000000","serviceCode":"739375515156215","serviceName":"6p87h0OHdC","sysId":"UnI9V","visitProvinceCode":"991"}
...
第4章 数据采集
4.1 Hadoop环境准备
Hadoop101 | Hadoop102 | Hadoop103 | |
---|---|---|---|
HDFD | NameNode, DataNode | DataNode | DataNode |
Yarn | NodeManager | ResourceManager,NodeManager | NodeManager |
4.1.1 添加LZO支持包
1)先下载lzo的jar项目
<u>https://github.com/twitter/hadoop-lzo/archive/master.zip</u>
2)下载后的文件名是hadoop-lzo-master,它是一个zip格式的压缩包,先进行解压,然后用maven编译。生成hadoop-lzo-0.4.20。
3)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/
[hadoop@hadoop101 common]$ pwd
/opt/modules/hadoop-2.7.2/share/hadoop/common
[hadoop@hadoop101 common]$ ls
hadoop-lzo-0.4.20.jar
4)同步hadoop-lzo-0.4.20.jar到hadoop102、hadoop103
[hadoop@hadoop101 common]$ xsync hadoop-lzo-0.4.20.jar
4.1.2 添加配置
1)core-site.xml增加配置支持LZO压缩
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
2)同步core-site.xml到hadoop102、hadoop103
[hadoop@hadoop101 hadoop]$ xsync core-site.xml
4.1.2 启动集群
[hadoop@hadoop101 hadoop-2.7.2]$ sbin/start-dfs.sh
[hadoop@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh
4.1.3 验证
1)web和进程查看
- Web查看:http://hadoop101:50070
- 进程查看:jps查看各个节点状态。
4.2 Zookeeper环境准备
Hadoop101 | Hadoop102 | Hadoop103 | |
---|---|---|---|
Zookeeper | Zookeeper | Zookeeper | Zookeeper |
4.2.2 ZK集群启动停止脚本
1)在hadoop101的/home/hadoop/bin目录下创建脚本
[hadoop@hadoop101 bin]$ vim zk.sh
在脚本中编写如下内容
#! /bin/bash
case $1 in
"start"){
for i in hadoop101 hadoop102 hadoop103
do
ssh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop101 hadoop102 hadoop103
do
ssh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh stop"
done
};;
esac
2)增加脚本执行权限
[hadoop@hadoop101 bin]$ chmod +x zk.sh
3)Zookeeper集群启动脚本
[hadoop@hadoop101 modules]$ zk.sh start
4)Zookeeper集群停止脚本
[hadoop@hadoop101 modules]$ zk.sh stop
4.3 Flume环境准备
Hadoop101 | Hadoop102 | Hadoop103 | |
---|---|---|---|
Flume(采集) | Flume |
flume配置分析
Flume的具体配置
(1)在/opt/datas/calllogs/flume1/目录下创建flume1.conf文件
[hadoop@hadoop101 conf]$ vim flume1.conf
在文件配置如下内容
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2
# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/datas/calllogs/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/datas/calllogs/records/calllogs.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.cmcc.jackyan.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.cmcc.jackyan.flume.interceptor.LogTypeInterceptor$Builder
# selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = recordType
a1.sources.r1.selector.mapping.volte= c1
a1.sources.r1.selector.mapping.cdr= c2
# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20
# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_volte
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1
# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_cdr
a1.sinks.k2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2
注意:com.cmcc.jackyan.flume.interceptor.LogETLInterceptor和com.jackyan.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。
4.3.3 自定义Flume拦截器
这里自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和json数据不完整的日志
日志类型区分拦截器主要用于,将volte话单和其他话单区分开来,方便发往kafka的不同topic。
1)创建maven模块flume-interceptor
2)创建包名:com.cmcc.jackyan.flume.interceptor
3)在pom.xml文件中添加如下配置
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.cmcc.jackyan.appclient.AppMain</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4)在com.cmcc.jackyan.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor
public class LogETLInterceptor implements Interceptor {
//打印日志,用于调试
private static final Logger logger = LoggerFactory.getLogger(LogETLInterceptor.class);
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), Charset.forName("UTF-8"));
// logger.info("Before:" + body);
event.setBody(LogUtils.addTimeStamp(body).getBytes());
body = new String(event.getBody(), Charset.forName("UTF-8"));
// logger.info("After:" + body);
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> intercepts = new ArrayList<>();
// 遍历所有的events,过滤掉不合法的
for (Event event : events) {
Event interceptEvent = intercept(event);
if (interceptEvent != null) {
intercepts.add(event);
}
}
return intercepts;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4)Flume日志处理工具类
public class LogUtils {
private static Logger logger = LoggerFactory.getLogger(LogUtils.class);
/**
* 给日志拼接一个时间戳
*
* @param log
*/
public static String addTimeStamp(String log) {
//{"cdrGenTime":"20211207091838913","channelCode":"k82Ce","charge":"86.14","dataDownLinkVolume":"3083","dataUpLinkVolume":"2311","duration":"133","homeProvinceCode":"210","imsi":"460004320498004","msisdn":"17268221069","recordType":"mms","resultCode":"000000","serviceCode":"711869363795868","serviceName":"H79cKXuJO4","sysId":"aGIL4","visitProvinceCode":"220"}
long t = System.currentTimeMillis();
return t + "|" + log;
}
}
5)Flume日志类型区分拦截器LogTypeInterceptor
public class LogTypeInterceptor implements Interceptor {
//打印日志,用于调试
private static final Logger logger = LoggerFactory.getLogger(LogTypeInterceptor.class);
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 1获取flume接收消息头
Map<String, String> headers = event.getHeaders();
// 2获取flume接收的json数据数组
byte[] json = event.getBody();
// 将json数组转换为字符串
String jsonStr = new String(json);
String recordType = "" ;
// volte
if (jsonStr.contains("volte")) {
recordType = "volte";
}
// cdr
else {
recordType = "cdr";
}
// 3将日志类型存储到flume头中
headers.put("recordType", recordType);
// logger.info("recordType:" + recordType);
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event : events) {
Event interceptEvent = intercept(event);
interceptors.add(interceptEvent);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。
7)需要先将打好的包放入到hadoop101的/opt/modules/flume/lib文件夹下面。
[hadoop@hadoop101 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT.jar
8)启动flume
[hadoop@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/datas/calllogs/flume1/flume1.conf &
4.3.4 日志采集Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume1.sh
[hadoop@hadoop101 bin]$ vim calllog-flume1.sh
在脚本中填写如下内容
#! /bin/bash
case $1 in
"start"){
for i in hadoop101
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume1/flume1.conf &"
done
};;
"stop"){
for i in hadoop101
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep flume1 | grep -v grep | awk '{print \$2}' | xargs kill -9"
done
};;
esac
说明:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思。
2)增加脚本执行权限
[hadoop@hadoop101 bin]$ chmod +x calllog-flume1.sh
3)flume1集群启动脚本
[hadoop@hadoop101 modules]$ calllog-flume1.sh start
4)flume1集群停止脚本
[hadoop@hadoop101 modules]$ calllog-flume1.sh stop
4.4 Kafka环境准备
Hadoop101 | Hadoop102 | Hadoop103 | |
---|---|---|---|
Kafka | Kafka | Kafka | Kafka |
4.4.1 Kafka集群启动停止脚本
1)在/home/hadoop/bin目录下创建脚本kafka.sh
[hadoop@hadoop101 bin]$ vim kafka.sh
在脚本中填写如下内容
#! /bin/bash
case $1 in
"start"){
for i in hadoop101 hadoop102 hadoop103
do
echo " --------启动 $i kafka-------"
# 用于KafkaManager监控
ssh $i "export JMX_PORT=9988 && /opt/modules/kafka/bin/kafka-server-start.sh -daemon /opt/modules/kafka/config/server.properties "
done
};;
"stop"){
for i in hadoop101 hadoop102 hadoop103
do
echo " --------停止 $i kafka-------"
ssh $i "ps -ef | grep server.properties | grep -v grep| awk '{print $2}' | xargs kill >/dev/null 2>&1 &"
done
};;
esac
注意:启动Kafka时要先开启JMX端口,是用于后续KafkaManager监控。
2)增加脚本执行权限
[hadoop@hadoop101 bin]$ chmod +x kafka.sh
3)kf集群启动脚本
[hadoop@hadoop101 modules]$ kafka.sh start
4)kf集群停止脚本
[hadoop@hadoop101 modules]$ kafka.sh stop
4.4.2 查看所有Kafka topic
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list
4.4.3 创建 Kafka topic
进入到/opt/modules/kafka/目录下分别创建:启动日志主题、事件日志主题。
1)创建volte主题
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --create --replication-factor 2 --partitions 3 --topic topic-volte
2)创建cdr主题
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --create --replication-factor 2 --partitions 3 --topic topic-cdr
4.4.4 删除 Kafka topic
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-volte
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-cdr
4.4.5 生产消息
[hadoop@hadoop101 kafka]$ bin/kafka-console-producer.sh \
--bootstrap-server hadoop101:9092 --topic topic-volte
>hello world
>hadoop hadoop
4.4.6 消费消息
[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop101:9092 --from-beginning --topic topic-volte
--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
4.4.7 查看某个Topic的详情
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \
--describe --topic topic-volte
4.5 Flume消费Kafka数据写到HDFS
Hadoop101 | Hadoop102 | Hadoop103 | |
---|---|---|---|
Flume(消费kafka) | Flume |
配置分析
Flume的具体配置
(1)在hadoop103的/opt/datas/calllogs/flume2/目录下创建flume2.conf文件
[hadoop@hadoop101 conf]$ vim flume2.conf
在文件配置如下内容
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181
a1.sources.r1.kafka.topics=topic-volte
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r2.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181
a1.sources.r2.kafka.topics=topic-cdr
## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000
## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/calllogs/records/topic-volte/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = volte-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/calllogs/records/topic-cdr/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = cdr-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 30
a1.sinks.k2.hdfs.roundUnit = second
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
4.5.2 Flume异常处理
1)问题描述:如果启动消费Flume抛出如下异常
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
2)解决方案步骤:
(1)在hadoop101服务器的/opt/modules/flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
(2)同步配置到hadoop102、hadoop103服务器
[hadoop@hadoop101 conf]$ xsync flume-env.sh
4.5.2 日志消费Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume2.sh
[hadoop@hadoop101 bin]$ vim calllog-flume2.sh
在脚本中填写如下内容
#! /bin/bash
case $1 in
"start"){
for i in hadoop103
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume2/flume2.conf &"
done
};;
"stop"){
for i in hadoop103
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep flume2 | grep -v grep | awk '{print \$2}' | xargs kill -9"
done
};;
esac
2)增加脚本执行权限
[hadoop@hadoop101 bin]$ chmod +x calllog-flume2.sh
3)f2集群启动脚本
[hadoop@hadoop101 modules]$ calllog-flume2.sh start
4)f2集群停止脚本
[hadoop@hadoop101 modules]$ calllog-flume2.sh stop
4.6 采集通道启动/停止脚本
1)在/opt/datas/calllogs目录下创建脚本cluster.sh
[hadoop@hadoop101 calllogs]$ vim cluster.sh
在脚本中填写如下内容
#! /bin/bash
case $1 in
"start"){
echo " -------- 启动 集群 -------"
echo " -------- 启动 hadoop集群 -------"
/opt/modules/hadoop-2.7.2/sbin/start-dfs.sh
ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/start-yarn.sh"
#启动 Zookeeper集群
zk.sh start
#启动 Flume采集集群
calllog-flume1.sh start
#启动 Kafka采集集群
kafka.sh start
sleep 4s;
#启动 Flume消费集群
calllog-flume2.sh start
};;
"stop"){
echo " -------- 停止 集群 -------"
#停止 Flume消费集群
calllog-flume2.sh stop
#停止 Kafka采集集群
kafka.sh stop
sleep 4s;
#停止 Flume采集集群
calllog-flume1.sh stop
#停止 Zookeeper集群
zk.sh stop
echo " -------- 停止 hadoop集群 -------"
ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/stop-yarn.sh"
/opt/modules/hadoop-2.7.2/sbin/stop-dfs.sh
};;
esac
2)增加脚本执行权限
[hadoop@hadoop101 calllogs]$ chmod +x cluster.sh
3)cluster集群启动脚本
[hadoop@hadoop101 calllogs]$ ./cluster.sh start
4)cluster集群停止脚本
[hadoop@hadoop101 calllogs]$ ./cluster.sh stop