Storm高阶(二):Trident连续之订单统计Demo

项目需求:收集订单详情,分析数据:
1、每天(每小时、每分钟)网站总销售额、订单笔数
2、基于地域(国家、省份、城市)、时段

1、测试数据准备

package com.xxx.storm.test;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;

/**
 * 订单数据生成
 * 
 *
 */
public class OrderDataGenerator {

    // order记录
    // "timestamp" "consumer" "productName" "price" "country" "province" "city"

    private static final String[] CONSUMERS = { "Merry", "John", "Tom", "Candy", 
            "张三丰", "周芷若", "张无忌", "令狐冲", "独孤九剑","郭靖", "杨过" };

    private static final String[] PRODUCT_NAMES = { "华为笔记本", "iPad", "苹果电脑", "iPhone", 
            "乐视TV", "美的空调", "小米2", "魅族" };

    private static final Map<String, Double> PRODUCT_PRICE = new HashMap<String, Double>();
    static {
        PRODUCT_PRICE.put("华为笔记本", 2345.89);
        PRODUCT_PRICE.put("iPad", 3567.78);
        PRODUCT_PRICE.put("苹果电脑", 23456.12);
        PRODUCT_PRICE.put("iPhone", 6732.81);
        PRODUCT_PRICE.put("乐视TV", 1234.76);
        PRODUCT_PRICE.put("美的空调", 1260.32);
        PRODUCT_PRICE.put("小米2", 2390.81);
        PRODUCT_PRICE.put("魅族", 3456.72);
    }

    private static final String[] ADDRESSES = { "中国,上海,浦东", "中国,上海,杨浦", "中国,福建,厦门", 
            "中国,浙江,杭州", "中国,江苏,苏州", "中国,北京,通州",
            "中国,北京,海淀" };

    // "timestamp" "consumer" "productName" "price" "country" "province" "city"
    /**
     * 模拟生成订单消费记录
     * 
     * @return
     */
    public static String generateOrderRecord() {
        long timestamp = System.currentTimeMillis();
        StringBuilder sbuilder = new StringBuilder("\""+timestamp + "\"");

        Random r = new Random();
        String consumer = CONSUMERS[r.nextInt(CONSUMERS.length)];
        sbuilder.append(" \"" + consumer + "\"");
        String productName = PRODUCT_NAMES[r.nextInt(PRODUCT_NAMES.length)];
        sbuilder.append(" \"" + productName + "\"");
        double price = PRODUCT_PRICE.get(productName);
        sbuilder.append(" \"" + price + "\"");
        String address = ADDRESSES[r.nextInt(ADDRESSES.length)];
        String[] addrInfos = address.split(",");
        sbuilder.append(" \"" + addrInfos[0]  + "\"");
        sbuilder.append(" \"" + addrInfos[1] + "\"");
        sbuilder.append(" \"" + addrInfos[2] + "\"");

        return sbuilder.toString();
    }

    public static void main(String[] args) {
        KafkaProducer kafkaProducer = new KafkaProducer();
        Producer<String,String> producer = kafkaProducer.getKafkaProducer(hadoop1:9092");
        for (; ; ) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            String msgKey = System.currentTimeMillis()+ "";
            String msg = OrderDataGenerator.generateOrderRecord();
            // 如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
            KeyedMessage<String, String> data = kafkaProducer.getKeyedMessage("test", msgKey, msg);
            
            kafkaProducer.sendMassage(producer, data);
        }
        //kafkaProducer.close(producer);
    }
}

package com.xxx.storm.test;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * Kafka生产者
 * 
 */
public class KafkaProducer {
    
    /**
     * 获取Kafka
     * @param brokerList
     * @return
     */
    public Producer<String,String> getKafkaProducer(String brokerList){
        // 设置配置属性
        Properties props = new Properties();
        props.put("metadata.broker.list", brokerList);
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // key.serializer.class默认为serializer.class
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        // 可选配置,如果不配置,则使用默认的partitioner
        props.put("partitioner.class", "com.xxx.storm.test.KafkaPartitioner");
        // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
        // 值为0,1,-1,可以参考
        // http://kafka.apache.org/08/configuration.html
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);

        // 创建producer
        Producer<String, String> producer = new Producer<String, String>(config);
        
        return producer;
    }
    
    /**
     * 关闭kafka生产者
     * @param producer
     */
    public void close(Producer<String,String> producer){
        producer.close();
    }
    
    /**
     * 发送数据到Kafka上
     * @param producer
     * @param data
     */
    public void sendMassage(Producer<String,String> producer,KeyedMessage<String, String> data){
        producer.send(data);
    }
    
    /**
     * 组装消息
     * @param topic
     * @param msgKey
     * @param msgContent
     * @return
     */
    public KeyedMessage<String,String> 
        getKeyedMessage(String topic,String msgKey,String msgContent){
        KeyedMessage<String,String> data = new KeyedMessage<String,String>(topic,msgKey,msgContent);
        return data;
    }
}
package com.xxx.storm.test;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class KafkaPartitioner implements Partitioner{
    
    public KafkaPartitioner(VerifiableProperties properties) {
    }

    @Override
    public int partition(Object obj, int numPartitions) {
        int partition = 0;  
        if (obj instanceof String) {
            String key=(String)obj;  
            int offset = key.lastIndexOf('.');  
            if (offset > 0) {  
                partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;  
            }  
        }else{  
            partition = obj.toString().length() % numPartitions;  
        }  
        return partition;
    }
}

2、从kafka上读取数据,计算完成以后,存储到内存和HBASE中

package com.xxx.storm.orderprocess;

import org.apache.storm.hbase.trident.state.HBaseMapState;

import com.xxx.storm.trident.PrintTestFilter;
import com.xxx.storm.trident.SplitFunction1;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import storm.kafka.BrokerHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.OpaqueTridentKafkaSpout;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.state.OpaqueValue;
import storm.trident.state.StateFactory;
import storm.trident.testing.MemoryMapState;

/**
 * 订单处理Topology
 *
 */
public class OrderProcessingTrident {
    
    private static final String SPOUT_ID = "kafakaSpout";
    
    public static void main(String[] args) {
        
        // 构造TridentTopology
        
        TridentTopology tridentTopology = new TridentTopology();
        
        // 使用KafkaSpout从kafka上读取消息
        BrokerHosts hosts = new ZkHosts("hadoop1:2181");
        String topic = "test";
        TridentKafkaConfig config = new TridentKafkaConfig(hosts,topic);
        
        config.forceFromStart = false;
        config.scheme = new SchemeAsMultiScheme(new StringScheme());
        
        OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = 
                new OpaqueTridentKafkaSpout(config);
        TransactionalTridentKafkaSpout transactionalTridentKafkaSpout = 
                new TransactionalTridentKafkaSpout(config);
        
        // {"str":"xxxxxxxx"}
        
        Stream stream = tridentTopology.newStream(SPOUT_ID, opaqueTridentKafkaSpout);
        
        //stream.each(new Fields("str"), new PrintTestFilter());
        
        // timestamp,yyyyMMddStr,yyyyMMddHHStr,yyyyMMddHHmmStr,consumer,productName,price,country,province,city
        Stream hasParseStream = stream.each(new Fields("str"), new OrderParseFunction(),
                new Fields("timestamp","yyyyMMddStr","yyyyMMddHHStr","yyyyMMddHHmmStr",
                        "consumer","productName","price",
                        "country","province","city"))
//          .each(new Fields("str","timestamp","yyyyMMddStr","yyyyMMddHHStr",
//                  "yyyyMMddHHmmStr",
//                  "consumer","productName","price",
//                  "country","province","city"), new PrintTestFilter())
        ;
        
        // 1、每天电商网站总销售额
        // 去掉用不到的keyvalue
        Stream partitionStatictisStream = 
                hasParseStream.project(new Fields("yyyyMMddStr","price"))
        // 随机重分区
        .shuffle()
        .groupBy(new Fields("yyyyMMddStr"))
        .chainedAgg()
        // 统计同一批次内各分区中订单金额总和
        .partitionAggregate(new Fields("price"), new SaleSum(), 
                new Fields("saleTotalAmtOfPartDay"))
        // 统计同一个批次内各分区中的订单笔数之和
        .partitionAggregate(new Count(), new Fields("numOrderOfPartDay"))
        .chainEnd()
        .parallelismHint(5)
        .toStream()
        ;
        // 全局统计每天的销售额
        TridentState saleAmtState =         
                partitionStatictisStream.groupBy(new Fields("yyyyMMddStr"))
        .persistentAggregate(
                new MemoryMapState.Factory(),
                new Fields("saleTotalAmtOfPartDay"),new Sum(),
                new Fields("saleGlobalAmtOfDay"))
        ;
//      saleAmtState.newValuesStream()
//      .each(new Fields("yyyyMMddStr","saleGlobalAmtOfDay"), new PrintTestFilter());
//      ;
        // 全局统计每天的订单总笔数
        TridentState numOfSaleState = 
                partitionStatictisStream.groupBy(new Fields("yyyyMMddStr"))
        .persistentAggregate(new MemoryMapState.Factory(), 
                new Fields("numOrderOfPartDay"), new Sum(),
                new Fields("numOrderGlobalOfDay"));
        
//      numOfSaleState.newValuesStream()
//      .each(new Fields("yyyyMMddStr","numOrderGlobalOfDay"), new PrintTestFilter());
//      state.newValuesStream()
//      .each(new Fields("yyyyMMddStr","saleGlobalAmtOfDay"), new PrintTestFilter());
        
        
        // 构造一个本地drpc服务
        LocalDRPC localDRPC = new LocalDRPC();
        tridentTopology.newDRPCStream("saleAmtOfDay",localDRPC)
            .each(new Fields("args"), new SplitFunction1(),new Fields("requestDate"))
            .stateQuery(saleAmtState, new Fields("requestDate"),new MapGet(), 
                    new Fields("saleGlobalAmtOfDay1"))
            .project(new Fields("requestDate","saleGlobalAmtOfDay1"))
            .each(new Fields("saleGlobalAmtOfDay1"), new FilterNull())
            ;
        
        tridentTopology.newDRPCStream("numOrderOfDay",localDRPC)
        .each(new Fields("args"), new SplitFunction1(),new Fields("requestDate"))
        .stateQuery(numOfSaleState, new Fields("requestDate"),new MapGet(), 
                new Fields("numOrderGlobalOfDay1"))
        .project(new Fields("requestDate","numOrderGlobalOfDay1"))
        .each(new Fields("numOrderGlobalOfDay1"), new FilterNull())
        ;
        
        
        
        // 基于地域、时段(yyyyMMddHHStr)统计分析销售额、订单笔数
//      "timestamp","yyyyMMddStr","yyyyMMddHHStr","yyyyMMddHHmmStr",
//      "consumer","productName","price",
//      "country","province","city"
        
        @SuppressWarnings("rawtypes")
        HBaseMapState.Options<OpaqueValue> opts = new HBaseMapState.Options<OpaqueValue>();
        opts.tableName ="saleTotalAmtOfAddrAndHour";
        opts.columnFamily ="cf";
        opts.qualifier = "sTAOAAH";
        
        // create 'saleTotalAmtOfAddrAndHour',{ NAME => 'cf' , VERSIONS => 1000}
        StateFactory factory = HBaseMapState.opaque(opts);
        
        TridentState saleTotalAmtOfAddrAndHourState = 
                hasParseStream.project(new Fields("yyyyMMddHHStr",
                "price","country","province","city"))
        .each(new Fields("yyyyMMddHHStr","country","province","city")
                , new CombineKeyFun(), new Fields("addrAndHour"))
        .project(new Fields("addrAndHour","price"))
        .groupBy(new Fields("addrAndHour"))
        .persistentAggregate(factory,new Fields("price"),
                new Sum(), new Fields("saleTotalAmtOfAddrAndHour"));
        
        
        saleTotalAmtOfAddrAndHourState.newValuesStream()
        .each(new Fields("addrAndHour",
                "saleTotalAmtOfAddrAndHour"), new PrintTestFilter());
        
        
        tridentTopology.newDRPCStream("saleTotalAmtOfAddrAndHour", localDRPC)
            .each(new Fields("args"), new SplitFunction1(),new Fields("requestAddrAndHour"))
            .stateQuery(saleTotalAmtOfAddrAndHourState,new Fields("requestAddrAndHour"),
                    new MapGet(), new Fields("saleTotalAmtOfAddrAndHour"))
            //.project(new Fields("requestAddrAndHour","saleTotalAmtOfAddrAndHour"))
            //.each(new Fields("saleTotalAmtOfAddrAndHour"), new FilterNull())
            ;
        
        
        Config conf = new Config();
        
        
        if(args == null || args.length <=0){
            // 本地测试
            LocalCluster localCluster = new LocalCluster();
            // topology名称唯一
            localCluster.submitTopology("orderProcessingTrident", conf, tridentTopology.build());
            
            while(true){
                
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String saleAmtResult = 
                        localDRPC.execute("saleAmtOfDay", "20160828 20160827");
                
                System.err.println("saleAmtResult=" +saleAmtResult);
                
                String numberOrderResult = 
                        localDRPC.execute("numOrderOfDay", "20160828 20160827");
                System.err.println("numberOrderResult=" + numberOrderResult);
                
                String saleTotalAmtOfAddrAndHourRessult = 
                        localDRPC.execute("saleTotalAmtOfAddrAndHour", "苏州_江苏_中国_2016082815");
                
                System.err.println(saleTotalAmtOfAddrAndHourRessult);
                
            }
        }else{
            try {
                StormSubmitter.submitTopology(args[0], conf, tridentTopology.build());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        }

    }

}

package com.xxx.storm.orderprocess;

import java.util.Map;

import backtype.storm.tuple.Values;
import storm.trident.operation.Function;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

/**
 * 为了方便后面的统计结果存储,需要将多字段进行拼接
 *
 */
public class CombineKeyFun implements Function {

    /**
     * 
     */
    private static final long serialVersionUID = -5609985189737017086L;

    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        // TODO Auto-generated method stub

    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        
        //"yyyyMMddHHStr","country","province","city"
        String yyyyMMddHHStr = tuple.getStringByField("yyyyMMddHHStr");
        
        String country = tuple.getStringByField("country");
        
        String province = tuple.getStringByField("province");
        
        String city = tuple.getStringByField("city");
        
        String newkey = city+"_"+province+"_"+country+"_"+yyyyMMddHHStr;
        
        collector.emit(new Values(newkey));

    }

}
package com.xxx.storm.orderprocess;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

import backtype.storm.tuple.Values;
import storm.trident.operation.Function;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;
/**
 * 订单解析Function
 *
 */
public class OrderParseFunction implements Function {

    /**
     * 
     */
    private static final long serialVersionUID = -8531306604648164614L;

    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        // TODO Auto-generated method stub

    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        String orderRecord = tuple.getStringByField("str");
        
        if(orderRecord != null && !"".equals(orderRecord)){
            String[] orderDetails = orderRecord.replace("\"", "").split(" ");
            
            // "timestamp" "consumer" "productName" "price" "country" "province" "city"
            long timestamp = Long.valueOf(orderDetails[0]);
            
            Date date = new Date(timestamp);
            
            DateFormat yyyyMMdd = new SimpleDateFormat("yyyyMMdd");
            String yyyyMMddStr = yyyyMMdd.format(date);
            
            DateFormat yyyyMMddHH = new SimpleDateFormat("yyyyMMddHH");
            String yyyyMMddHHStr = yyyyMMddHH.format(date);
            
            
            DateFormat yyyyMMddHHmm = new SimpleDateFormat("yyyyMMddHHmm");
            String yyyyMMddHHmmStr = yyyyMMddHHmm.format(date);
            
            String consumer = orderDetails[1];
            String productName = orderDetails[2];
            double price = Double.valueOf(orderDetails[3]);
            String country = orderDetails[4];
            String province = orderDetails[5];
            String city = orderDetails[6];
            
            collector.emit(new Values(timestamp,yyyyMMddStr,yyyyMMddHHStr,yyyyMMddHHmmStr,consumer,productName,price,
                    country,province,city));
        }

    }

}

package com.xxx.storm.orderprocess;

import java.util.Map;

import org.slf4j.Logger;

import backtype.storm.tuple.Values;
import storm.trident.operation.Aggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;

/**
 * 进行同一批次各个分区内局部统计
 *
 */
public class SaleSum implements Aggregator<SaleSumState> {
    
    private Logger logger  = org.slf4j.LoggerFactory.getLogger(SaleSum.class);
    

    /**
     * 
     */
    private static final long serialVersionUID = -6879728480425771684L;

    private int partitionIndex ;
    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        
        partitionIndex = context.getPartitionIndex();
        
        //System.err.println(partitionIndex);
        logger.debug("partitionIndex=" + partitionIndex);
        //logger.info(arg0);
        //logger.error(arg0);
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public SaleSumState init(Object batchId, TridentCollector collector) {
        return new SaleSumState();
    }

    @Override
    public void aggregate(SaleSumState val, TridentTuple tuple, TridentCollector collector) {
        
        double oldSaleSum = val.saleSum;
         
        double price = tuple.getDoubleByField("price");
        
        double newSaleSum = oldSaleSum + price ;
        
        val.saleSum = newSaleSum;
        
    }

    @Override
    public void complete(SaleSumState val, TridentCollector collector) {
        
//      System.err.println("SaleSum---> partitionIndex=" + this.partitionIndex 
//              + ",saleSum=" + val.saleSum);
        collector.emit(new Values(val.saleSum));
        
    }
}
package com.xxx.storm.orderprocess;

/**
 * 局部统计销售额状态类
 *
 */
public class SaleSumState {
    
    double saleSum = 0.0;

}

注:使用HBase数据库进行存储统计结果时,value会由于Trident事务级别而不一致,
NON-TRANSACTIONAL 统计值
TANSACTIONAL : BATCH_ID 统计值
OPAQUE TRANSACTIONAL : BATCH_ID 统计值 上个批次的统计值;
同时,在maven项目中创建src/main/resources的 source foulder,将hbase-site.xml放进去

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容