工作中的一个storm实时流计算需求案例

开门见山,先说一说业务需求背景吧。两个月前在老大的要求下着手公司的一个storm实时流计算需求,简单的说,就是把C终端发来的数据进行业务处理,在kafka和storm中来回的进行计算,把最终的结果数据持久化到redis中作为数据源供后端调用。作为技术小白当时的心情是崩溃的(),除了之前对各个组件有一些简单的了解外,整体还停留在hello world层面,各组件间的整合对本小白来说难度也就不言而喻了。没有相关的知识储备,面对这个需求还真有点不知所措,尽管只是简单的算数运算,但是小白也表示压力山大。没办法,硬着头皮上吧,看了看相关的视频资料,总算在师父的指点下,磕磕绊绊,跑通了整个流程,虽然计算的准确性还有问题,小白表示收获还是蛮大的。

废话少说,直奔正题。

【相关框架】
业务主要涉及的技术不多,大体符合常规的实时流计算架构模型,strom + kafka + redis,所以需要先在本机环境搭建这几个环境,另外,zookeeper环境也必不可少(kafka的消息主题都存放在zookeeper中)

【运行环境】

jdk1.7
zookeeper-3.4.5
storm-0.9.2
kafka-2.9
redis-3.2.3

【架构】
先借用一张最常见的strom实时流分析通用模型设计


strom流分析通用架构.jpg

【本业务模型】

业务模型.png

【数据流向】

数据流向.png

这里值得注意的是,常规设计一般均在strom处理完数据后直接从bolt将数据发送给redis持久层,之所以本业务没有直接从bolt流向redis而是分主题转向kafka,再从kafka分主题发送给redis是因为数据源发送数据的频率太高(大概3-5次/s),老大说redis会承受不住,测试时已验证,具体原因可能跟redis的读取频率有关,这里是单节点,如果是redis集群环境会更好点儿。至于模型设计是灵活的,根据具体业务酌情考虑,没有规定非得按某个模型来,符合实际业务即可。

先奉上一些模拟数据吧,这里由于是local环境,所以LZ提前在本地收录了一些模拟数据存放在TXT文件中,利用kafka逐行读取数据并发送至指定topic来模拟数据源。数据格式如下,每一条消息有27个字段,每个字段代表不同的含义。
【模拟数据】

16 91 16777216 0 17 6 7 15 41 46 535 6.158485 1.813451 0.000000 -1068 8496 13572 28 276 0 1597 100 0 1496821304 436028
16 91 16777216 0 17 6 7 15 41 46 785 5.917774 1.716683 0.000000 -1064 8392 13544 109 254 -19 1598 100 0 1496821304 686031
16 91 16777216 0 17 6 7 15 41 47 35 5.090148 1.145671 0.000000 0 0 0 0 0 0 0 0 0 1496821304 935960
16 91 16777216 0 17 6 7 15 41 47 285 6.013670 1.660154 0.000000 -1096 8432 13700 72 314 12 1599 100 0 1496821305 186008
16 91 16777216 0 17 6 7 15 41 47 535 5.637641 1.650586 0.000000 -1080 8468 13468 23 373 -10 1600 100 0 1496821305 436129

【项目结构】

2017-06-30_173218.png

【代码】

【pom.xml】注意:引入的jar版本要与Linux下安装的版本保持一致

<dependencies>
    <!-- storm相关包 -->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2</version>
    </dependency>
    <dependency>
           <groupId>org.apache.storm</groupId>
           <artifactId>storm-kafka</artifactId>
           <version>0.9.2</version>
    </dependency>

    <!-- kafka相关包 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka</artifactId>
        <version>2.9.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

<!-- redis相关包 -->
       <dependency>
           <groupId>redis.clients</groupId>
           <artifactId>jedis</artifactId>
           <version>2.8.1</version>
       </dependency>
       <dependency>
           <groupId>org.apache.commons</groupId>
           <artifactId>commons-pool2</artifactId>
           <version>3.2.3</version>
       </dependency>
   </dependencies>

启动虚拟机环境(redis,zookeeper,kafka)

#启动redis服务
./redis-server ./redis.conf

#启动zookeeper服务
./zkServer.sh start

#启动kafka服务
./kafka-server-start.sh -daemon ../config/server.properties

#jps检查进程

#开启总数据消费(这里主题设为:htb_position_test)
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic htb_position_test --from-beginning
2017-06-30_163546.png

【redadTxt_KafkaProduce.java】此类读取指定文件并发送消息到对应主题,模拟数据源

/**
 * kafka生产者类
 * @author lvfang
 */
public class redadTxt_KafkaProduce extends Thread {

    private String topic;//主题
    private String src;//数据源
    
    public redadTxt_KafkaProduce(String topic){  
        this.topic = topic;        
    } 
    
    public redadTxt_KafkaProduce(String topic,String src){  
        this.topic = topic; 
        this.src = src;
    }
    
    //创建生产者
    private Producer createProducer(){
        Properties properties = new Properties();
        //zookeeper单节点
        properties.put("zookeeper.connect","192.168.1.201:2181");
        properties.put("serializer.class", StringEncoder.class.getName());  

        //kafka单节点
        properties.put("metadata.broker.list", "192.168.1.201:9092");
        properties.put("advertised.host.name", "192.168.1.201");
        return new Producer<Integer, String>(new ProducerConfig(properties)); 
    }
    
    @Override
    public void run() {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader(src));
            // 创建生产者
            Producer producer = createProducer();

            String line = null;
            // 循环发送消息到kafka
            while ((line = br.readLine()) != null) {        
                producer.send(new KeyedMessage<Integer, String>(topic,line + "\n"));
                
                // 发送消息的时间间隔,一秒发送3此
                Thread.sleep(333);
            }
        } catch (Exception e) {
        } finally {
            try {
                if (br != null) br.close();
            } catch (IOException e) {}
        }
    }
    
    //---------------------主方法----------------------------
    public static void main(String[] args) {
        // 使用kafka集群中创建好的主题 test  
        new redadTxt_KafkaProduce("htb_position_test","D:/testdata/htb_position_test_data.txt").start(); 
    }

启动redadTxt_KafkaProduce线程发送数据,并去linux终端查看,确保数据在发送

2017-06-30_164402.png

【HdtasInfo.java】javaBean,数据承载容器类

/**
 * @author lvfang
 * @create 2017-06-09 13:57
 * @desc 数据容器bean
 **/
public class HdtasInfo {
    //协议类型
    public static final String PROTOCOL_TYPE = "protocol_type";
    //场地ID
    public static final String FIELD_ID = "field_id";
    //主设备ID
    public static final String UWB_ID = "uwb_id";
    //护腿板ID
    public static final String SIGN_ID = "sign_id";
    // 年 月 日   时 分 秒  毫秒
    public static final String YEAR = "year";
    public static final String MONTH = "month";
    public static final String DAY = "day";
    public static final String HOUR = "hour";
    public static final String MINUTE = "minute";
    public static final String SECOND = "second";
    public static final String MILLISECOND = "millisecond";
    //定位精度  X Y Z
    public static final String X = "x";
    public static final String Y = "y";
    public static final String Z = "z";
    //加速度 X Y Z
    public static final String A_SPEED_X = "a_speed_x";
    public static final String A_SPEED_Y = "a_speed_y";
    public static final String A_SPEED_Z = "a_speed_z";
    //陀螺仪 X Y Z
    public static final String GYROSCOPE_X = "gyroscope_x";
    public static final String GYROSCOPE_Y = "gyroscope_y";
    public static final String GYROSCOPE_Z = "gyroscope_z";
    //心率
    public static final String HEART_RATE = "heart_rate";
    //电池电量
    public static final String ELECTRIC = "electric";
    //电池充电状态  1:充电  0:放电
    public static final String CHARGING_STATUS = "charging_status";
    //Unix时间戳  秒
    public static final String SERVER_ACCEPT_TIME_S = "server_accept_time_s";
    //Unix时间戳  纳秒
    public static final String SERVER_ACCEPT_TIME_N = "server_accept_time_n";
}

【KafkaUtil.java】由于全过程要多次创建kafka生产消费者,所以单提出工具类

public class KafkaUtil {
    
    public static final String HDTAS_SPOUT = "hdtasSpout";
    public static final String HDTAS_DATA_BOLT = "dataBolt";
    public static final String HDTAS_SPEED_BOLT = "hdtas_speed_bolt";
    public static final String HDTAS_AGILE_BOLT = "hdtas_agile_bolt";
    public static final String HDTAS_BATTERY_BOLT = "hdtas_battery_bolt";
    public static final String HDTAS_DISTANCE_BOLT = "hdtas_distance_bolt";
    public static final String HDTAS_HEARTRATE_BOLT = "hdtas_heartrate_bolt";
    public static final String HDTAS_POSITION_BOLT = "hdtas_psoition_bolt";
    
    public static final String HDTAS_SPEED_GROOPID = "hdtas_speed_groopId";
    public static final String HDTAS_AGILE_GROOPID = "hdtas_agile_groopId";
    public static final String HDTAS_BATTERY_GROOPID = "hdtas_battery_groopId";
    public static final String HDTAS_DISTANCE_GROOPID = "hdtas_distance_groopId";
    public static final String HDTAS_HEARTRATE_GROOPID = "hdtas_heartrate_groopId";
    public static final String HDTAS_POSITION_GROOPID = "hdtas_position_groopId";

    // 创建生产者
    public static Producer<Integer, String> createProducer() {
        Properties properties = new Properties();
        // zookeeper单节点
        properties.put("zookeeper.connect", "192.168.1.201:2181");
        properties.put("serializer.class", StringEncoder.class.getName());

        // kafka单节点
        properties.put("metadata.broker.list", "192.168.1.201:9092");
        
        //不设置可能会报错:kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        properties.put("advertised.host.name", "192.168.1.201");
        return new Producer<Integer, String>(new ProducerConfig(properties));
    }

    // 创建消费者
    public static ConsumerConnector createConsumer(String groupId) {
        Properties properties = new Properties();
        // 声明zookeeper集群链接地址
        properties.put("zookeeper.connect", "192.168.1.201:2181");
        
        // 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
        properties.put("group.id", groupId);
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
}

【RedisUtil.java】redis数据源工具类

public class RedisUtil {

private static JedisPool pool = null;
    
  /**
   * @author lvfang
   * @create 2017-06-09 13:57
   * @desc 数据容器bean
   * @param ip
   * @param port
   * @return JedisPool
   **/
    public static JedisPool getPool() {
        if (pool == null) {
            JedisPoolConfig config = new JedisPoolConfig();
            //控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
            //如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
            config.setMaxActive(500);
            //控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
            config.setMaxIdle(5);
            //表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
            config.setMaxWait(1000 * 100);
            //在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
            config.setTestOnBorrow(true);
            
            //public JedisPool(final ConfigpoolConfig, final String host, int port, int timeout, final String password, final int database)
            //public JedisPool(final ConfigpoolConfig, final String host, final int port, final int timeout)
            pool = new JedisPool(config, "reids主机IP", 端口,20000,"密码",0);
            
        }
        return pool;
    }

    /**
     * 返还到连接池
     * 
     * @param pool 
     * @param redis
     */
    public static void returnResource(JedisPool pool, Jedis redis) {
        if (redis != null) {
            pool.returnResourceObject(redis);
        }
    }
    
    public static void main(String[] args) {
        System.out.println(RedisUtil.getPool().getResource().ping());
    }
}

【Constant.java】特殊符号工具类

public class Constant {

    public static final char CHAR_BAR = '|';
    public static final char CHAR_SLASH = '/';
    public static final char CHAR_MINUS = '-';
    public static final char CHAR_TAB = '\t';
    public static final char CHAR_COMMA = ',';
    public static final char CHAR_UNDERLINE = '_';
    public static final char CHAR_CURLY_BRACKETS_LEFT = '{';
    public static final char CHAR_COLON = ':';
    public static final String STR_TAB = "\\t";
    public static final String STR_COMMA = ",";
    public static final String STR_EMPTY = "";
    public static final String STR_UNDERLINE = "_";

    public static final String TXT = ".txt";

    public static final String PLATFORM_TYPE_WEBSITE = "0";
    public static final String PLATFORM_TYPE_ANDROID = "1";
    public static final String PLATFORM_TYPE_IPHONE = "2";

    public static final int DEFAULT_NUM_WORKERS = 2;
    public static final int DEFAULT_BOLT_PARALLELISM_HINT = 1;
    public static final int DEFAULT_SPOUT_PARALLELISM_HINT = 1;
    public static final int DEFAULT_KAFKA_TOPICTHREAD_CAPACITY = 1;

    public static final int REDIS_KEY_EXPIRE_3_MONTH = 90 * 24 * 60 * 60;
}

【KafkaSpoutMain.java】此类是kafka与storm的整合,它即是kafka消息的消费者又是strom数据源的数据生产者,其从对应的topic接收消息,并作为storm数据的数据源。

/**
 * @author lvfang
 * @create 2017-06-09 13:57
 * @desc kafka整合storm 主程序入口
 **/
public class KafkaSpoutMain {
    
    // 主题与zk端口
    public static final String TOPIC = "htb_position_test";
    public static final String ZKINFO = "192.168.1.201:2181";

    private static final String HDTAS_SPOUT = "hdtasSpout";
    private static final String HDTAS_DATA_BOLT = "dataBolt";
    private static final String HDTAS_SPEED_BOLT = "hdtas_speed_bolt";
    private static final String HDTAS_AGILE_BOLT = "hdtas_agile_bolt";
    private static final String HDTAS_BATTERY_BOLT = "hdtas_battery_bolt";
    private static final String HDTAS_DISTANCE_BOLT = "hdtas_distance_bolt";
    private static final String HDTAS_HEARTRATE_BOLT = "hdtas_heartrate_bolt";
    private static final String HDTAS_POSITION_BOLT = "hdtas_psoition_bolt";

    public static void main(String[] args) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        //创建zk主机
        ZkHosts zkHosts = new ZkHosts(ZKINFO);
        //创建spout
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, TOPIC, "","KafkaSpout");
        //整合kafkaSpout
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        //设置storm数据源为kafka整合storm的kafkaSpout
        topologyBuilder.setSpout(HDTAS_SPOUT, kafkaSpout, 1);
        //流向dataBolt进行空格分割处理(总处理,同时分发给多个bolt)
        topologyBuilder.setBolt(HDTAS_DATA_BOLT, new DataBolt(), 1).shuffleGrouping(HDTAS_SPOUT);
        //灵敏度数据流
        topologyBuilder.setBolt(HDTAS_AGILE_BOLT,new HdtasAgileBolt(HDTAS_AGILE_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
        //速度数据流
        topologyBuilder.setBolt(HDTAS_SPEED_BOLT,new HdtasSpeedBolt(HDTAS_SPEED_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
        //电量数据流
        topologyBuilder.setBolt(HDTAS_BATTERY_BOLT,new HdtasBatteryBolt(HDTAS_BATTERY_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
        //距离数据流
        topologyBuilder.setBolt(HDTAS_DISTANCE_BOLT,new HdtasDistanceBolt(HDTAS_DISTANCE_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
        //心率数据流
        topologyBuilder.setBolt(HDTAS_HEARTRATE_BOLT,new HdtasHeartrateBolt(HDTAS_HEARTRATE_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
        //坐标数据流
        topologyBuilder.setBolt(HDTAS_POSITION_BOLT,new HdtasPositionBolt(HDTAS_POSITION_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
        
        Config config = new Config();
        config.setNumWorkers(1);

        if (args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], config,topologyBuilder.createTopology());
            } catch (Exception e) {}
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("HDTAS", config,topologyBuilder.createTopology());
        }

    }
}
2017-06-30_165146.png

KafkaSpoutMain 类主要编写的数据在各个bolt间的流程逻辑,通过上图不难看出spout接收到数据后先整体发送给DataBolt,DataBolt进行数据切分后在同时发送给各个bolt,各个bolt进行各自的业务处理。组分割字段是userID,进程数酌情设置,单节点就设置1,集群环境设置节点个数

【DataBolt.java】总bolt,进行数据切分

/**
 * @author lvfang
 * @create 2017-06-09 13:57
 * @desc 总Bolt,对数据进行分割处理
 **/
public class DataBolt extends BaseRichBolt {

    private OutputCollector collector;

    public Map<String,String> map;

    /**
     * 业务操作,数据处理(这里进行分割发送)
     * @param tuple
     */
    @Override
    public void execute(Tuple tuple) {
        String string = new String((byte[]) tuple.getValue(0));

        String[] datas = string.split(" ");//按空格切分
   
        if(datas.length==25){
            this.collector.emit(new Values(datas[0],datas[1],datas[2],datas[3],datas[4],datas[5],datas[6],datas[7],datas[8],datas[9],
                    datas[10],datas[11],datas[12],datas[13],datas[14],datas[15],datas[16],datas[17],datas[18],datas[9],
                    datas[20],datas[21],datas[22],datas[23],datas[24]));
        }
    }

    /**
     * 初始化方法
     * @param map
     * @param topologyContext
     * @param outputCollector
     */
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    /**
     * 指定流向,标注流向字段
     * @param declarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(
                new Fields(HdtasInfo.PROTOCOL_TYPE,
                        HdtasInfo.FIELD_ID,
                        HdtasInfo.UWB_ID,
                        HdtasInfo.SIGN_ID,
                        HdtasInfo.YEAR,
                        HdtasInfo.MONTH,
                        HdtasInfo.DAY,
                        HdtasInfo.HOUR,
                        HdtasInfo.MINUTE,
                        HdtasInfo.SECOND,
                        HdtasInfo.MILLISECOND,
                        HdtasInfo.X,
                        HdtasInfo.Y,
                        HdtasInfo.Z,
                        HdtasInfo.A_SPEED_X,
                        HdtasInfo.A_SPEED_Y,
                        HdtasInfo.A_SPEED_Z,
                        HdtasInfo.GYROSCOPE_X,
                        HdtasInfo.GYROSCOPE_Y,
                        HdtasInfo.GYROSCOPE_Z,
                        HdtasInfo.HEART_RATE,
                        HdtasInfo.ELECTRIC,
                        HdtasInfo.CHARGING_STATUS,
                        HdtasInfo.SERVER_ACCEPT_TIME_S,
                        HdtasInfo.SERVER_ACCEPT_TIME_N));
    }
}

以上DataBolt将数据切分后以字段标示并发出,由各个bolt去自行获取,由于bolt较多,这里就只提供一个bolt的代码,其他bolt等同,只是个业务bolt获取的数据不同,以TLY业务处理为例

【HdtasAgileBolt.java】需要获取到坐标z,y,z,time,userId,fieldId等数据并进行处理

/**
 * @author lvfang
 * @create 2017-06-09 13:57
 * @desc 灵敏度Bolt
 **/
public class HdtasAgileBolt extends BaseRichBolt {
    
    private String topic;
    private StringBuilder sb;
    private Producer<Integer, String> producer;
    private OutputCollector collector;
    
    public HdtasAgileBolt(String topic){
        this.topic = topic;
    }

    @Override
    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        //this.topic = KafkaUtil.HDTAS_AGILE_BOLT;
        producer = new KafkaUtil().createProducer();
        this.collector = collector;
    }
    

    @Override
    public void execute(Tuple input) {
        //这里进行灵敏度数据操作
        String userId = input.getStringByField(HdtasInfo.SIGN_ID);
        String fieldId = input.getStringByField(HdtasInfo.FIELD_ID);
        String x = input.getStringByField(HdtasInfo.X);
        String y = input.getStringByField(HdtasInfo.Y);
        String z = input.getStringByField(HdtasInfo.Z);
        String time = input.getStringByField(HdtasInfo.SERVER_ACCEPT_TIME_S);
        
        sb = new StringBuilder();
        sb.append(userId).append(Constant.STR_UNDERLINE)
            .append(fieldId).append(Constant.STR_UNDERLINE)
            .append(x).append(Constant.STR_UNDERLINE)
            .append(y).append(Constant.STR_UNDERLINE)
            .append(z).append(Constant.STR_UNDERLINE)
            .append(time);

//        this.message = userId + "_" + fieldId + "_" + x + "_" + y + "_" + z + "_" + time;      
        
//发送给指定的kafka主题
        producer.send(new KeyedMessage<Integer, String>(topic,sb.toString()));   
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      //这里没有流转至下一个bolt,自然不用重写。
    }
}

要注意的是,实现bolt的方式有好几种,实现IRichBolt,或者继承BaseRichBolt等,这里选用后者,因为后者会自动ACK(老大告诉的),貌似是storm消息保障机制.
由于需要将bolt处理的数据流转给kafka,所有在上bolt的初始化方法prepare中初始化好了一个kafka-producer,在数据进行业务处理完后,将数据发送给对应的主题,这里当然是TLY主题hdtas_agile_bolt。启动kafkaspout主方法进行数据处理,我们可以去虚拟机终端查看,也可以自行写kafka-customer去消费对应topic,小白是从Linux终端查看的

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic hdtas_agile_bolt --from-beginning
2017-06-30_171501.png

其他bolt等同,这时候该书写TLY的主题消费类,消费后持久化

【HdtasAgileCusumer.java】此类主要消费hdtas_agile_bolt主题数据,并进行持久化,这里注意在存储数据到redis中要设置过去时间,由于redis的数据持久化特性,如果不设过去时间,会造成存储数据文件过大

/**
 * kafka消费者类
 * @author lvfang
 *
 */
public class HdtasAgileCusumer extends Thread {

    private String topic;//主题
    private static JedisPool pool;
    private Jedis jedis;
    private String[] messages;
    private String key;
    private String field;
    private String value;
    
    static {
        pool = RedisUtil.getPool();
    }
    
    public HdtasAgileCusumer(String topic){  
        super();  
        this.topic = topic;  
    } 
    
    @Override
    public void run() {
        //创建消费者
        ConsumerConnector consumer = KafkaUtil.createConsumer(KafkaUtil.HDTAS_AGILE_GROOPID);//createConsumer();  
        //主题数map
        Map<String, Integer> topicCountMap = new HashMap<>();
        // 一次从topic主题中获取一个数据 
        topicCountMap.put(topic, 1);
        //创建一个获取消息的消息流
        Map<String,List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
        // 获取每次接收topic主题到的这个数据  
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        
        try {
            jedis = pool.getResource();
            //循环打印
            while (iterator.hasNext()) {
                String message = new String(iterator.next().message()); 
                
                System.out.println("接收到: " + message);
                //接收到的数据格式:userId + "_" + fieldId + "_" + x + "_" + y + "_" + z + "_" + time;
                messages = message.split("_");
                
                if(messages.length == 6){
                    key = "hiseeHTLY_" + messages[1];
                    field = messages[0] + "_" + messages[5];
                    value = messages[2] + "_" + messages[3] + "_" + messages[4];
                    
                    jedis.hset(key, field, value);
                    jedis.expire(key, 60);
                }           
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            RedisUtil.returnResource(pool, jedis);
        }
    }
        
    public static void main(String[] args) {
        // 使用kafka集群中创建好的主题 test 
        new HdtasAgileCusumer(KafkaUtil.HDTAS_AGILE_BOLT).start();  
    }
}

这时我们可以启动HDTAS_AGILE_BOLT的消费类,并去redis查看是否持久化成功

2017-06-30_172531.png
2017-06-30_172709.png

我们可以看到数据持久化成功,其他主题消费等同,整个数据流向很简单,只过了两次bolt,

来一张全图:

2017-06-30_174438.png
2017-06-30_174618.png

流计算真心属于入门水准,欢迎拍砖。
最后真心感谢师父的指点 ! ! !

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容