项目需求:收集订单详情,分析数据:
1、每天(每小时、每分钟)网站总销售额、订单笔数
2、基于地域(国家、省份、城市)、时段
1、测试数据准备
package com.xxx.storm.test;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
/**
* 订单数据生成
*
*
*/
public class OrderDataGenerator {
// order记录
// "timestamp" "consumer" "productName" "price" "country" "province" "city"
private static final String[] CONSUMERS = { "Merry", "John", "Tom", "Candy",
"张三丰", "周芷若", "张无忌", "令狐冲", "独孤九剑","郭靖", "杨过" };
private static final String[] PRODUCT_NAMES = { "华为笔记本", "iPad", "苹果电脑", "iPhone",
"乐视TV", "美的空调", "小米2", "魅族" };
private static final Map<String, Double> PRODUCT_PRICE = new HashMap<String, Double>();
static {
PRODUCT_PRICE.put("华为笔记本", 2345.89);
PRODUCT_PRICE.put("iPad", 3567.78);
PRODUCT_PRICE.put("苹果电脑", 23456.12);
PRODUCT_PRICE.put("iPhone", 6732.81);
PRODUCT_PRICE.put("乐视TV", 1234.76);
PRODUCT_PRICE.put("美的空调", 1260.32);
PRODUCT_PRICE.put("小米2", 2390.81);
PRODUCT_PRICE.put("魅族", 3456.72);
}
private static final String[] ADDRESSES = { "中国,上海,浦东", "中国,上海,杨浦", "中国,福建,厦门",
"中国,浙江,杭州", "中国,江苏,苏州", "中国,北京,通州",
"中国,北京,海淀" };
// "timestamp" "consumer" "productName" "price" "country" "province" "city"
/**
* 模拟生成订单消费记录
*
* @return
*/
public static String generateOrderRecord() {
long timestamp = System.currentTimeMillis();
StringBuilder sbuilder = new StringBuilder("\""+timestamp + "\"");
Random r = new Random();
String consumer = CONSUMERS[r.nextInt(CONSUMERS.length)];
sbuilder.append(" \"" + consumer + "\"");
String productName = PRODUCT_NAMES[r.nextInt(PRODUCT_NAMES.length)];
sbuilder.append(" \"" + productName + "\"");
double price = PRODUCT_PRICE.get(productName);
sbuilder.append(" \"" + price + "\"");
String address = ADDRESSES[r.nextInt(ADDRESSES.length)];
String[] addrInfos = address.split(",");
sbuilder.append(" \"" + addrInfos[0] + "\"");
sbuilder.append(" \"" + addrInfos[1] + "\"");
sbuilder.append(" \"" + addrInfos[2] + "\"");
return sbuilder.toString();
}
public static void main(String[] args) {
KafkaProducer kafkaProducer = new KafkaProducer();
Producer<String,String> producer = kafkaProducer.getKafkaProducer(hadoop1:9092");
for (; ; ) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msgKey = System.currentTimeMillis()+ "";
String msg = OrderDataGenerator.generateOrderRecord();
// 如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
KeyedMessage<String, String> data = kafkaProducer.getKeyedMessage("test", msgKey, msg);
kafkaProducer.sendMassage(producer, data);
}
//kafkaProducer.close(producer);
}
}
package com.xxx.storm.test;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Kafka生产者
*
*/
public class KafkaProducer {
/**
* 获取Kafka
* @param brokerList
* @return
*/
public Producer<String,String> getKafkaProducer(String brokerList){
// 设置配置属性
Properties props = new Properties();
props.put("metadata.broker.list", brokerList);
props.put("serializer.class", "kafka.serializer.StringEncoder");
// key.serializer.class默认为serializer.class
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
// 可选配置,如果不配置,则使用默认的partitioner
props.put("partitioner.class", "com.xxx.storm.test.KafkaPartitioner");
// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
// 值为0,1,-1,可以参考
// http://kafka.apache.org/08/configuration.html
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
// 创建producer
Producer<String, String> producer = new Producer<String, String>(config);
return producer;
}
/**
* 关闭kafka生产者
* @param producer
*/
public void close(Producer<String,String> producer){
producer.close();
}
/**
* 发送数据到Kafka上
* @param producer
* @param data
*/
public void sendMassage(Producer<String,String> producer,KeyedMessage<String, String> data){
producer.send(data);
}
/**
* 组装消息
* @param topic
* @param msgKey
* @param msgContent
* @return
*/
public KeyedMessage<String,String>
getKeyedMessage(String topic,String msgKey,String msgContent){
KeyedMessage<String,String> data = new KeyedMessage<String,String>(topic,msgKey,msgContent);
return data;
}
}
package com.xxx.storm.test;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class KafkaPartitioner implements Partitioner{
public KafkaPartitioner(VerifiableProperties properties) {
}
@Override
public int partition(Object obj, int numPartitions) {
int partition = 0;
if (obj instanceof String) {
String key=(String)obj;
int offset = key.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
}
}else{
partition = obj.toString().length() % numPartitions;
}
return partition;
}
}
2、从kafka上读取数据,计算完成以后,存储到内存和HBASE中
package com.xxx.storm.orderprocess;
import org.apache.storm.hbase.trident.state.HBaseMapState;
import com.xxx.storm.trident.PrintTestFilter;
import com.xxx.storm.trident.SplitFunction1;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import storm.kafka.BrokerHosts;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.trident.OpaqueTridentKafkaSpout;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Sum;
import storm.trident.state.OpaqueValue;
import storm.trident.state.StateFactory;
import storm.trident.testing.MemoryMapState;
/**
* 订单处理Topology
*
*/
public class OrderProcessingTrident {
private static final String SPOUT_ID = "kafakaSpout";
public static void main(String[] args) {
// 构造TridentTopology
TridentTopology tridentTopology = new TridentTopology();
// 使用KafkaSpout从kafka上读取消息
BrokerHosts hosts = new ZkHosts("hadoop1:2181");
String topic = "test";
TridentKafkaConfig config = new TridentKafkaConfig(hosts,topic);
config.forceFromStart = false;
config.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout =
new OpaqueTridentKafkaSpout(config);
TransactionalTridentKafkaSpout transactionalTridentKafkaSpout =
new TransactionalTridentKafkaSpout(config);
// {"str":"xxxxxxxx"}
Stream stream = tridentTopology.newStream(SPOUT_ID, opaqueTridentKafkaSpout);
//stream.each(new Fields("str"), new PrintTestFilter());
// timestamp,yyyyMMddStr,yyyyMMddHHStr,yyyyMMddHHmmStr,consumer,productName,price,country,province,city
Stream hasParseStream = stream.each(new Fields("str"), new OrderParseFunction(),
new Fields("timestamp","yyyyMMddStr","yyyyMMddHHStr","yyyyMMddHHmmStr",
"consumer","productName","price",
"country","province","city"))
// .each(new Fields("str","timestamp","yyyyMMddStr","yyyyMMddHHStr",
// "yyyyMMddHHmmStr",
// "consumer","productName","price",
// "country","province","city"), new PrintTestFilter())
;
// 1、每天电商网站总销售额
// 去掉用不到的keyvalue
Stream partitionStatictisStream =
hasParseStream.project(new Fields("yyyyMMddStr","price"))
// 随机重分区
.shuffle()
.groupBy(new Fields("yyyyMMddStr"))
.chainedAgg()
// 统计同一批次内各分区中订单金额总和
.partitionAggregate(new Fields("price"), new SaleSum(),
new Fields("saleTotalAmtOfPartDay"))
// 统计同一个批次内各分区中的订单笔数之和
.partitionAggregate(new Count(), new Fields("numOrderOfPartDay"))
.chainEnd()
.parallelismHint(5)
.toStream()
;
// 全局统计每天的销售额
TridentState saleAmtState =
partitionStatictisStream.groupBy(new Fields("yyyyMMddStr"))
.persistentAggregate(
new MemoryMapState.Factory(),
new Fields("saleTotalAmtOfPartDay"),new Sum(),
new Fields("saleGlobalAmtOfDay"))
;
// saleAmtState.newValuesStream()
// .each(new Fields("yyyyMMddStr","saleGlobalAmtOfDay"), new PrintTestFilter());
// ;
// 全局统计每天的订单总笔数
TridentState numOfSaleState =
partitionStatictisStream.groupBy(new Fields("yyyyMMddStr"))
.persistentAggregate(new MemoryMapState.Factory(),
new Fields("numOrderOfPartDay"), new Sum(),
new Fields("numOrderGlobalOfDay"));
// numOfSaleState.newValuesStream()
// .each(new Fields("yyyyMMddStr","numOrderGlobalOfDay"), new PrintTestFilter());
// state.newValuesStream()
// .each(new Fields("yyyyMMddStr","saleGlobalAmtOfDay"), new PrintTestFilter());
// 构造一个本地drpc服务
LocalDRPC localDRPC = new LocalDRPC();
tridentTopology.newDRPCStream("saleAmtOfDay",localDRPC)
.each(new Fields("args"), new SplitFunction1(),new Fields("requestDate"))
.stateQuery(saleAmtState, new Fields("requestDate"),new MapGet(),
new Fields("saleGlobalAmtOfDay1"))
.project(new Fields("requestDate","saleGlobalAmtOfDay1"))
.each(new Fields("saleGlobalAmtOfDay1"), new FilterNull())
;
tridentTopology.newDRPCStream("numOrderOfDay",localDRPC)
.each(new Fields("args"), new SplitFunction1(),new Fields("requestDate"))
.stateQuery(numOfSaleState, new Fields("requestDate"),new MapGet(),
new Fields("numOrderGlobalOfDay1"))
.project(new Fields("requestDate","numOrderGlobalOfDay1"))
.each(new Fields("numOrderGlobalOfDay1"), new FilterNull())
;
// 基于地域、时段(yyyyMMddHHStr)统计分析销售额、订单笔数
// "timestamp","yyyyMMddStr","yyyyMMddHHStr","yyyyMMddHHmmStr",
// "consumer","productName","price",
// "country","province","city"
@SuppressWarnings("rawtypes")
HBaseMapState.Options<OpaqueValue> opts = new HBaseMapState.Options<OpaqueValue>();
opts.tableName ="saleTotalAmtOfAddrAndHour";
opts.columnFamily ="cf";
opts.qualifier = "sTAOAAH";
// create 'saleTotalAmtOfAddrAndHour',{ NAME => 'cf' , VERSIONS => 1000}
StateFactory factory = HBaseMapState.opaque(opts);
TridentState saleTotalAmtOfAddrAndHourState =
hasParseStream.project(new Fields("yyyyMMddHHStr",
"price","country","province","city"))
.each(new Fields("yyyyMMddHHStr","country","province","city")
, new CombineKeyFun(), new Fields("addrAndHour"))
.project(new Fields("addrAndHour","price"))
.groupBy(new Fields("addrAndHour"))
.persistentAggregate(factory,new Fields("price"),
new Sum(), new Fields("saleTotalAmtOfAddrAndHour"));
saleTotalAmtOfAddrAndHourState.newValuesStream()
.each(new Fields("addrAndHour",
"saleTotalAmtOfAddrAndHour"), new PrintTestFilter());
tridentTopology.newDRPCStream("saleTotalAmtOfAddrAndHour", localDRPC)
.each(new Fields("args"), new SplitFunction1(),new Fields("requestAddrAndHour"))
.stateQuery(saleTotalAmtOfAddrAndHourState,new Fields("requestAddrAndHour"),
new MapGet(), new Fields("saleTotalAmtOfAddrAndHour"))
//.project(new Fields("requestAddrAndHour","saleTotalAmtOfAddrAndHour"))
//.each(new Fields("saleTotalAmtOfAddrAndHour"), new FilterNull())
;
Config conf = new Config();
if(args == null || args.length <=0){
// 本地测试
LocalCluster localCluster = new LocalCluster();
// topology名称唯一
localCluster.submitTopology("orderProcessingTrident", conf, tridentTopology.build());
while(true){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String saleAmtResult =
localDRPC.execute("saleAmtOfDay", "20160828 20160827");
System.err.println("saleAmtResult=" +saleAmtResult);
String numberOrderResult =
localDRPC.execute("numOrderOfDay", "20160828 20160827");
System.err.println("numberOrderResult=" + numberOrderResult);
String saleTotalAmtOfAddrAndHourRessult =
localDRPC.execute("saleTotalAmtOfAddrAndHour", "苏州_江苏_中国_2016082815");
System.err.println(saleTotalAmtOfAddrAndHourRessult);
}
}else{
try {
StormSubmitter.submitTopology(args[0], conf, tridentTopology.build());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
}
}
package com.xxx.storm.orderprocess;
import java.util.Map;
import backtype.storm.tuple.Values;
import storm.trident.operation.Function;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;
/**
* 为了方便后面的统计结果存储,需要将多字段进行拼接
*
*/
public class CombineKeyFun implements Function {
/**
*
*/
private static final long serialVersionUID = -5609985189737017086L;
@Override
public void prepare(Map conf, TridentOperationContext context) {
// TODO Auto-generated method stub
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//"yyyyMMddHHStr","country","province","city"
String yyyyMMddHHStr = tuple.getStringByField("yyyyMMddHHStr");
String country = tuple.getStringByField("country");
String province = tuple.getStringByField("province");
String city = tuple.getStringByField("city");
String newkey = city+"_"+province+"_"+country+"_"+yyyyMMddHHStr;
collector.emit(new Values(newkey));
}
}
package com.xxx.storm.orderprocess;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import backtype.storm.tuple.Values;
import storm.trident.operation.Function;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;
/**
* 订单解析Function
*
*/
public class OrderParseFunction implements Function {
/**
*
*/
private static final long serialVersionUID = -8531306604648164614L;
@Override
public void prepare(Map conf, TridentOperationContext context) {
// TODO Auto-generated method stub
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String orderRecord = tuple.getStringByField("str");
if(orderRecord != null && !"".equals(orderRecord)){
String[] orderDetails = orderRecord.replace("\"", "").split(" ");
// "timestamp" "consumer" "productName" "price" "country" "province" "city"
long timestamp = Long.valueOf(orderDetails[0]);
Date date = new Date(timestamp);
DateFormat yyyyMMdd = new SimpleDateFormat("yyyyMMdd");
String yyyyMMddStr = yyyyMMdd.format(date);
DateFormat yyyyMMddHH = new SimpleDateFormat("yyyyMMddHH");
String yyyyMMddHHStr = yyyyMMddHH.format(date);
DateFormat yyyyMMddHHmm = new SimpleDateFormat("yyyyMMddHHmm");
String yyyyMMddHHmmStr = yyyyMMddHHmm.format(date);
String consumer = orderDetails[1];
String productName = orderDetails[2];
double price = Double.valueOf(orderDetails[3]);
String country = orderDetails[4];
String province = orderDetails[5];
String city = orderDetails[6];
collector.emit(new Values(timestamp,yyyyMMddStr,yyyyMMddHHStr,yyyyMMddHHmmStr,consumer,productName,price,
country,province,city));
}
}
}
package com.xxx.storm.orderprocess;
import java.util.Map;
import org.slf4j.Logger;
import backtype.storm.tuple.Values;
import storm.trident.operation.Aggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;
/**
* 进行同一批次各个分区内局部统计
*
*/
public class SaleSum implements Aggregator<SaleSumState> {
private Logger logger = org.slf4j.LoggerFactory.getLogger(SaleSum.class);
/**
*
*/
private static final long serialVersionUID = -6879728480425771684L;
private int partitionIndex ;
@Override
public void prepare(Map conf, TridentOperationContext context) {
partitionIndex = context.getPartitionIndex();
//System.err.println(partitionIndex);
logger.debug("partitionIndex=" + partitionIndex);
//logger.info(arg0);
//logger.error(arg0);
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public SaleSumState init(Object batchId, TridentCollector collector) {
return new SaleSumState();
}
@Override
public void aggregate(SaleSumState val, TridentTuple tuple, TridentCollector collector) {
double oldSaleSum = val.saleSum;
double price = tuple.getDoubleByField("price");
double newSaleSum = oldSaleSum + price ;
val.saleSum = newSaleSum;
}
@Override
public void complete(SaleSumState val, TridentCollector collector) {
// System.err.println("SaleSum---> partitionIndex=" + this.partitionIndex
// + ",saleSum=" + val.saleSum);
collector.emit(new Values(val.saleSum));
}
}
package com.xxx.storm.orderprocess;
/**
* 局部统计销售额状态类
*
*/
public class SaleSumState {
double saleSum = 0.0;
}
注:使用HBase数据库进行存储统计结果时,value会由于Trident事务级别而不一致,
NON-TRANSACTIONAL 统计值
TANSACTIONAL : BATCH_ID 统计值
OPAQUE TRANSACTIONAL : BATCH_ID 统计值 上个批次的统计值;
同时,在maven项目中创建src/main/resources的 source foulder,将hbase-site.xml放进去