话单数据仓库搭建(1)- 数仓概念及数据采集

第1章 数据仓库概念

数据仓库( Data Warehouse ),可简写为DW或DWH。数据仓库,是为企业所有决策制定过程,提供所有系统数据支持的战略集合。
通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本、提高产品质量等。
数据仓库,并不是数据的最终目的地,而是为数据最终的目的地做好准备。这些准备包括对数据的:清洗,转义,分类,重组,合并,拆分,统计等等。

第2章 项目需求

2.1 项目需求分析

1、实时采集埋点的用户行为数据
2、实现数据仓库的分层搭建
3、每天定时导入业务数据
4、根据数据仓库中的数据进行报表分析

2.2 项目框架

2.2.1 技术选型

数据采集传输:Flume,Kafka,Logstash,DataX,Sqoop
数据存储:Hive,MySql,HDFS,HBase,S3
数据计算:Spark,Hive,Tez,Flink,Storm
数据查询:Presto,Impala,Kylin

2.2.2 常见系统架构图设计

2.2.3 常见系统数据流程设计

2.2.4 框架版本选型

软件产品 版本
Hadoop 2.7.2
Flume 1.7.0
Kafka 0.11.0.2
Kafka Manager 1.3.3.22
Hive 1.2.1
Sqoop 1.4.6
Mysql 5.7
Azkaban 2.5.0
Java 1.8
Zookeeper 3.4.10

注意事项:框架选型尽量不要选择最新的框架,选择最新框架半年前左右的稳定版。
2.2.5 集群资源规划设计

服务器1 服务器2 服务器3
HDFS NameNode,DataNode DataNode DataNode
Yarn NodeManager Resourcemanager,NodeManager NodeManager
Zookeeper Zookeeper Zookeeper Zookeeper
Flume(采集日志) Flume Flume
Kafka Kafka Kafka Kafka
Flume(消费Kafka) Flume
Hive Hive
Mysql Mysql

第3章 数据生成模块

3.1 话单数据字段

private String sysId;               // 平台编码
private String serviceName;         // 接口服务名称
private String homeProvinceCode;    // 归属省
private String visitProvinceCode;   // 访问省
private String channelCode;         // 渠道编码
private String serviceCode;         // 业务流水号
private String cdrGenTime;  // 开始时间
private String duration;            // 时长
private String recordType;          // 话单类型
private String imsi;                // 46000开头15位编码
private String msisdn;              // 通常指手机号码
private String dataUpLinkVolume;    // 上行流量
private String dataDownLinkVolume;  // 下行流量
private String charge;              // 费用
private String resultCode;          // 结果编码

3.2 话单数据格式:

{"cdrGenTime":"20211206165309844","channelCode":"wRFUC","charge":"62.28","dataDownLinkVolume":"2197","dataUpLinkVolume":"467","duration":"1678","homeProvinceCode":"551","imsi":"460007237312954","msisdn":"15636420864","recordType":"gprs","resultCode":"000000","serviceCode":"398814910321850","serviceName":"a9icp8xcNy","sysId":"U1ob6","visitProvinceCode":"451"}

3.3 编写项目模拟生成话单数据

详见项目:cdr_gen_app
对项目进行打包并部署到Hadoop101机器上,执行命令测试数据生成

[hadoop@hadoop101 ~]$ head datas/call.log 
{"cdrGenTime":"20211206173630762","channelCode":"QEHVc","charge":"78.59","dataDownLinkVolume":"3007","dataUpLinkVolume":"3399","duration":"1545","homeProvinceCode":"571","imsi":"460002441249614","msisdn":"18981080935","recordType":"gprs","resultCode":"000000","serviceCode":"408688053609568","serviceName":"8lr23kV4C2","sysId":"j4FbV","visitProvinceCode":"931"}
{"cdrGenTime":"20211206173630884","channelCode":"TY4On","charge":"31.17","dataDownLinkVolume":"1853","dataUpLinkVolume":"2802","duration":"42","homeProvinceCode":"891","imsi":"460008474876800","msisdn":"15998955319","recordType":"gprs","resultCode":"000000","serviceCode":"739375515156215","serviceName":"6p87h0OHdC","sysId":"UnI9V","visitProvinceCode":"991"}
...

第4章 数据采集

4.1 Hadoop环境准备

Hadoop101 Hadoop102 Hadoop103
HDFD NameNode, DataNode DataNode DataNode
Yarn NodeManager ResourceManager,NodeManager NodeManager

4.1.1 添加LZO支持包

1)先下载lzo的jar项目
<u>https://github.com/twitter/hadoop-lzo/archive/master.zip</u>
2)下载后的文件名是hadoop-lzo-master,它是一个zip格式的压缩包,先进行解压,然后用maven编译。生成hadoop-lzo-0.4.20。
3)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/

[hadoop@hadoop101 common]$ pwd
/opt/modules/hadoop-2.7.2/share/hadoop/common
[hadoop@hadoop101 common]$ ls
hadoop-lzo-0.4.20.jar

4)同步hadoop-lzo-0.4.20.jar到hadoop102、hadoop103

[hadoop@hadoop101 common]$ xsync hadoop-lzo-0.4.20.jar

4.1.2 添加配置

1)core-site.xml增加配置支持LZO压缩

    <property>
       <name>io.compression.codecs</name>
       <value>
           org.apache.hadoop.io.compress.GzipCodec,
           org.apache.hadoop.io.compress.DefaultCodec,
           org.apache.hadoop.io.compress.BZip2Codec,
           org.apache.hadoop.io.compress.SnappyCodec,
           com.hadoop.compression.lzo.LzoCodec,
           com.hadoop.compression.lzo.LzopCodec
       </value>
  </property>
  <property>
      <name>io.compression.codec.lzo.class</name>
      <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

2)同步core-site.xml到hadoop102、hadoop103

[hadoop@hadoop101 hadoop]$ xsync core-site.xml

4.1.2 启动集群

[hadoop@hadoop101 hadoop-2.7.2]$ sbin/start-dfs.sh
[hadoop@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh

4.1.3 验证

1)web和进程查看

4.2 Zookeeper环境准备

Hadoop101 Hadoop102 Hadoop103
Zookeeper Zookeeper Zookeeper Zookeeper

4.2.2 ZK集群启动停止脚本

1)在hadoop101的/home/hadoop/bin目录下创建脚本

[hadoop@hadoop101 bin]$ vim zk.sh

在脚本中编写如下内容

#! /bin/bash

case $1 in
"start"){
    for i in hadoop101 hadoop102 hadoop103
    do
        ssh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh start"
    done
    };;
"stop"){
    for i in hadoop101 hadoop102 hadoop103
    do
        ssh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh stop"
    done
    };;
esac

2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x zk.sh

3)Zookeeper集群启动脚本

[hadoop@hadoop101 modules]$ zk.sh start

4)Zookeeper集群停止脚本

[hadoop@hadoop101 modules]$ zk.sh stop

4.3 Flume环境准备

Hadoop101 Hadoop102 Hadoop103
Flume(采集) Flume

flume配置分析


Flume的具体配置
(1)在/opt/datas/calllogs/flume1/目录下创建flume1.conf文件
[hadoop@hadoop101 conf]$ vim flume1.conf
在文件配置如下内容

a1.sources=r1
a1.channels=c1 c2 
a1.sinks=k1 k2 

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/datas/calllogs/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/datas/calllogs/records/calllogs.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.cmcc.jackyan.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.cmcc.jackyan.flume.interceptor.LogTypeInterceptor$Builder

# selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = recordType
a1.sources.r1.selector.mapping.volte= c1
a1.sources.r1.selector.mapping.cdr= c2

# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20

a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20

# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_volte
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1

# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_cdr
a1.sinks.k2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2

注意:com.cmcc.jackyan.flume.interceptor.LogETLInterceptor和com.jackyan.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

4.3.3 自定义Flume拦截器

这里自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和json数据不完整的日志
日志类型区分拦截器主要用于,将volte话单和其他话单区分开来,方便发往kafka的不同topic。
1)创建maven模块flume-interceptor
2)创建包名:com.cmcc.jackyan.flume.interceptor
3)在pom.xml文件中添加如下配置

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.cmcc.jackyan.appclient.AppMain</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

4)在com.cmcc.jackyan.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor

public class LogETLInterceptor implements Interceptor {

    //打印日志,用于调试
    private static final Logger logger = LoggerFactory.getLogger(LogETLInterceptor.class);

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        String body = new String(event.getBody(), Charset.forName("UTF-8"));
//        logger.info("Before:" + body);

        event.setBody(LogUtils.addTimeStamp(body).getBytes());

        body = new String(event.getBody(), Charset.forName("UTF-8"));

//        logger.info("After:" + body);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> intercepts = new ArrayList<>();

        // 遍历所有的events,过滤掉不合法的
        for (Event event : events) {
            Event interceptEvent = intercept(event);
            if (interceptEvent != null) {
                intercepts.add(event);
            }
        }
        return intercepts;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

4)Flume日志处理工具类

public class LogUtils {

    private static Logger logger = LoggerFactory.getLogger(LogUtils.class);

    /**
     * 给日志拼接一个时间戳
     *
     * @param log
     */
    public static String addTimeStamp(String log) {

        //{"cdrGenTime":"20211207091838913","channelCode":"k82Ce","charge":"86.14","dataDownLinkVolume":"3083","dataUpLinkVolume":"2311","duration":"133","homeProvinceCode":"210","imsi":"460004320498004","msisdn":"17268221069","recordType":"mms","resultCode":"000000","serviceCode":"711869363795868","serviceName":"H79cKXuJO4","sysId":"aGIL4","visitProvinceCode":"220"}
        long t = System.currentTimeMillis();

        return t + "|" + log;
    }
}

5)Flume日志类型区分拦截器LogTypeInterceptor

public class LogTypeInterceptor implements Interceptor {

    //打印日志,用于调试
    private static final Logger logger = LoggerFactory.getLogger(LogTypeInterceptor.class);

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 1获取flume接收消息头
        Map<String, String> headers = event.getHeaders();

        // 2获取flume接收的json数据数组

        byte[] json = event.getBody();
        // 将json数组转换为字符串
        String jsonStr = new String(json);

        String recordType = "" ;

        // volte
        if (jsonStr.contains("volte")) {
            recordType = "volte";
        }
        // cdr
        else {
            recordType = "cdr";
        }

        // 3将日志类型存储到flume头中
        headers.put("recordType", recordType);

//        logger.info("recordType:" + recordType);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        ArrayList<Event> interceptors = new ArrayList<>();

        for (Event event : events) {
            Event interceptEvent = intercept(event);

            interceptors.add(interceptEvent);
        }

        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。
7)需要先将打好的包放入到hadoop101的/opt/modules/flume/lib文件夹下面。

[hadoop@hadoop101 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT.jar

8)启动flume

[hadoop@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/datas/calllogs/flume1/flume1.conf &

4.3.4 日志采集Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume1.sh

[hadoop@hadoop101 bin]$ vim calllog-flume1.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop101
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume1/flume1.conf &"
        done
};;
"stop"){
        for i in hadoop101
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep flume1 | grep -v grep | awk '{print \$2}' | xargs kill -9"
        done

};;
esac

说明:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思。
2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x calllog-flume1.sh

3)flume1集群启动脚本

[hadoop@hadoop101 modules]$ calllog-flume1.sh start

4)flume1集群停止脚本

[hadoop@hadoop101 modules]$ calllog-flume1.sh stop

4.4 Kafka环境准备

Hadoop101 Hadoop102 Hadoop103
Kafka Kafka Kafka Kafka

4.4.1 Kafka集群启动停止脚本

1)在/home/hadoop/bin目录下创建脚本kafka.sh

[hadoop@hadoop101 bin]$ vim kafka.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop101 hadoop102 hadoop103
        do
                echo " --------启动 $i kafka-------"
                # 用于KafkaManager监控

                ssh $i "export JMX_PORT=9988 && /opt/modules/kafka/bin/kafka-server-start.sh -daemon /opt/modules/kafka/config/server.properties "
        done
};;
"stop"){
        for i in hadoop101 hadoop102 hadoop103
        do
                echo " --------停止 $i kafka-------"
                ssh $i "ps -ef | grep server.properties | grep -v grep| awk '{print $2}' | xargs kill >/dev/null 2>&1 &"
        done
};;
esac

注意:启动Kafka时要先开启JMX端口,是用于后续KafkaManager监控。
2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x kafka.sh

3)kf集群启动脚本

[hadoop@hadoop101 modules]$ kafka.sh start

4)kf集群停止脚本

[hadoop@hadoop101 modules]$ kafka.sh stop

4.4.2 查看所有Kafka topic

[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list

4.4.3 创建 Kafka topic

进入到/opt/modules/kafka/目录下分别创建:启动日志主题、事件日志主题。
1)创建volte主题

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181  --create --replication-factor 2 --partitions 3 --topic topic-volte  

2)创建cdr主题

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181  --create --replication-factor 2 --partitions 3 --topic topic-cdr

4.4.4 删除 Kafka topic

[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-volte

[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-cdr

4.4.5 生产消息

[hadoop@hadoop101 kafka]$ bin/kafka-console-producer.sh \
--bootstrap-server hadoop101:9092 --topic topic-volte
>hello world
>hadoop  hadoop

4.4.6 消费消息

[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop101:9092 --from-beginning --topic topic-volte

--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

4.4.7 查看某个Topic的详情

[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \
--describe --topic topic-volte

4.5 Flume消费Kafka数据写到HDFS

Hadoop101 Hadoop102 Hadoop103
Flume(消费kafka) Flume

配置分析

Flume的具体配置
(1)在hadoop103的/opt/datas/calllogs/flume2/目录下创建flume2.conf文件

[hadoop@hadoop101 conf]$ vim flume2.conf

在文件配置如下内容

## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181
a1.sources.r1.kafka.topics=topic-volte

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r2.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181
a1.sources.r2.kafka.topics=topic-cdr

## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000

## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/calllogs/records/topic-volte/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = volte-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/calllogs/records/topic-cdr/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = cdr-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 30
a1.sinks.k2.hdfs.roundUnit = second

## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

4.5.2 Flume异常处理
1)问题描述:如果启动消费Flume抛出如下异常

ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解决方案步骤:
(1)在hadoop101服务器的/opt/modules/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop102、hadoop103服务器

[hadoop@hadoop101 conf]$ xsync flume-env.sh

4.5.2 日志消费Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume2.sh

[hadoop@hadoop101 bin]$ vim calllog-flume2.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop103
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume2/flume2.conf &"
        done
};;
"stop"){
        for i in hadoop103
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep flume2 | grep -v grep | awk '{print \$2}' | xargs kill -9"
        done

};;
esac

2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x calllog-flume2.sh

3)f2集群启动脚本

[hadoop@hadoop101 modules]$ calllog-flume2.sh start

4)f2集群停止脚本

[hadoop@hadoop101 modules]$ calllog-flume2.sh stop

4.6 采集通道启动/停止脚本

1)在/opt/datas/calllogs目录下创建脚本cluster.sh

[hadoop@hadoop101 calllogs]$ vim cluster.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        echo " -------- 启动 集群 -------"

        echo " -------- 启动 hadoop集群 -------"
        /opt/modules/hadoop-2.7.2/sbin/start-dfs.sh 
        ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/start-yarn.sh"

        #启动 Zookeeper集群
        zk.sh start

        #启动 Flume采集集群
        calllog-flume1.sh start

        #启动 Kafka采集集群
        kafka.sh start

        sleep 4s;

        #启动 Flume消费集群
        calllog-flume2.sh start
};;
"stop"){
        echo " -------- 停止 集群 -------"

        #停止 Flume消费集群
        calllog-flume2.sh stop

        #停止 Kafka采集集群
        kafka.sh stop

        sleep 4s;

        #停止 Flume采集集群
        calllog-flume1.sh stop

        #停止 Zookeeper集群
        zk.sh stop

        echo " -------- 停止 hadoop集群 -------"
        ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/stop-yarn.sh"
        /opt/modules/hadoop-2.7.2/sbin/stop-dfs.sh 
};;
esac

2)增加脚本执行权限

[hadoop@hadoop101 calllogs]$ chmod +x cluster.sh

3)cluster集群启动脚本

[hadoop@hadoop101 calllogs]$ ./cluster.sh start

4)cluster集群停止脚本

[hadoop@hadoop101 calllogs]$ ./cluster.sh stop
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容