数据算法 Hadoop/Spark大数据处理---第六章

本章欲解决的问题

移动平均:也即是不定具体数值内值的平均,类似于看同花顺的时候,可以看五分线,十分线,时K,周K等。所给的例子为(k,t,v):其它k为键(股票名字),t是时间(时分秒),v是相关联的值(价格)等。


本章的输入和输出

股票价格(k,t,v),在其特定的时间内输出(k,t,周期内平均值)


本章共有四种实现方式

  1. 基于内存实现的创建mapreduce
  2. 分区分组的shuffle方式的mapreduce
  3. 基于传统的内存式Scala实现
  4. 基于分区分组的Scala实现

首先介绍java实现移动平局的两种方式

//基于队列实现
private double sum = 0;
    private int period;
    private Queue<Double> window = new LinkedList<>();
    
    public SimpleMovingAverage(int preriod) {
        this.period = preriod;
    }
    
    public void addNumber(double number) {
        sum += number;
        window.add(number);
        if(window.size()>period) {
            sum -= window.remove();
        }
    }
    public double getMovingAverage() {
        return sum/window.size();
    }


//基于数组实现
public class SimpleMovingAverageUsingArray {
    private double sum=0;
    private int preiod;
    private double[] window = null;
    private int pointer = 0;
    private int size = 0;
    
    public SimpleMovingAverageUsingArray(int preiod) {
        this.preiod = preiod;
        window  =new double[preiod];
    }
    public void addNewNuber(double number) {
        sum += number;
        if(size<preiod) {
            window[pointer++] = number;
            size++;
        }else {
            pointer = pointer%preiod;
            sum -= window[pointer];
            window[pointer++]=number;
        }
    }
    public double getMovingAverage() {
        return sum/size;
    }
    
    public static void main(String[] args) {
        double[] testData= {10,18,20,30,24,33,27};
        int[] allWindowSize= {3,4};
        for(int windowSize:allWindowSize) {
            SimpleMovingAverageUsingArray sau = new SimpleMovingAverageUsingArray(windowSize);
            for(double x:testData) {
                sau.addNewNuber(x);
                System.out.println("Next Number= "+x+" sau: "+sau.getMovingAverage());
            }
            System.out.println("");
        }
        
    }
}

基于内存实现的创建mapreduce

输入格式为:<name-as-string><,><timestamp><,><value-as-double>,设置key为:<name-as-string>,value为:(<timestamp><,><value-as-double>)


类名 描述
SortInMemory_MovingAverageDriver 提交Hadoop作业的驱动器程序
SortInMemory_MovingAverageMapper 定义map()函数
SortInMemory_MovingAverageReducer 定义reduce()函数
//map端函数
 public void map(LongWritable key, Text value, Context context)
       throws IOException, InterruptedException {
       String record = value.toString();
       if ((record == null) || (record.length() == 0)) {
          return;
       }
       String[] tokens = StringUtils.split(record.trim(), ",");
       if (tokens.length == 3) {
          // tokens[0] = name of timeseries as string
          // tokens[1] = timestamp
          // tokens[2] = value of timeseries as double
          Date date = DateUtil.getDate(tokens[1]);
          if (date == null) {
             return;
          }
          //发出key为name of timeseries as string,value为(timestamp ,value of timeseries as double)
           //TimeSeriesData javaBean 定义了它们
          reducerKey.set(tokens[0]); // set the name as key
          reducerValue.set(date.getTime(), Double.parseDouble(tokens[2]));
          context.write(reducerKey, reducerValue);
       }
       else {
          // log as error, not enough tokens
       }
   }


//reduce函数
public void setup(Context context)
        throws IOException, InterruptedException {
        this.windowSize = context.getConfiguration().getInt("moving.average.window.size", 5);
        System.out.println("setup(): key="+windowSize);
    }

    public void reduce(Text key, Iterable<TimeSeriesData> values, Context context)  
        throws IOException, InterruptedException {
       
        System.out.println("reduce(): key="+key.toString());

        // build the unsorted list of timeseries
        List<TimeSeriesData> timeseries = new ArrayList<TimeSeriesData>();
        for (TimeSeriesData tsData : values) {
            TimeSeriesData copy = TimeSeriesData.copy(tsData);
            timeseries.add(copy);
        } 
        
        //在TimeSeriesData有重写了sort的compareT方法了
        Collections.sort(timeseries);
        System.out.println("reduce(): timeseries="+timeseries.toString());
        
        
        // 先把小于预设值的先计算
        double sum = 0.0;
        for (int i=0; i < windowSize-1; i++) {
            sum += timeseries.get(i).getValue();
        }
        
        // 大于预设值的,则到达一个发送一个
        Text outputValue = new Text();
        //遍历序列timeseries
        for (int i = windowSize-1; i < timeseries.size(); i++) {
            System.out.println("reduce(): key="+key.toString() + "  i="+i);
            sum += timeseries.get(i).getValue();
            double movingAverage = sum / windowSize;
            long timestamp = timeseries.get(i).getTimestamp();
            outputValue.set(DateUtil.getDateAsString(timestamp) + "," + movingAverage);
            // send output to HDFS
            context.write(key, outputValue);
            
            //减出第一个,为下一次迭代做准备
            sum -= timeseries.get(i-windowSize+1).getValue();
        }
    } // reduce

分区分组的shuffle方式的mapreduce

基于非内存的排序,也即是二次排序,需要经过洗牌阶段

类名 描述
CompositeKey 定义一个定制组合键{String,timestamp}
CompositeKeyComparator 映射阶段的数据输出发送到洗牌阶段之前先分区
MovingAverage 一个简单移动平均算法
NaturalKeyGroupingComparator 洗牌阶段对组合键分组
NaturalKeyPartitioner 洗牌阶段之前先分区
SortByMRF_MovingAverageDriver Hadoop作业的驱动器
SortByMRF_MovingAverageMapper 定义map()函数
SortByMRF_MovingAverageReducer 定义reduce()函数
//整个作业的驱动器
  //自然键分区,根据哈希算法
  jobconf.setPartitionerClass(NaturalKeyPartitioner.class);
  //分组之后内排序    
  jobconf.setOutputKeyComparatorClass(CompositeKeyComparator.class);
  //reduce阶段对自然键排序
  jobconf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class);


//MovingAverage基于数组的方式来计算移动平均值
public void addNewNumber(double number) {
        sum += number;
        if (size < period) {
            window[pointer++] = number;
            size++;
        } 
        else {
            // size = period (size cannot be > period)
            pointer = pointer % period;
            sum -= window[pointer];
            window[pointer++] = number;
        }
    }

    public double getMovingAverage() {
        if (size == 0) {
            throw new IllegalArgumentException("average is undefined");
        }
        //
        return sum / size;
    }


//map()函数
 @Override
    public void map(LongWritable inkey, Text value,
            OutputCollector<CompositeKey, TimeSeriesData> output,
            Reporter reporter) throws IOException {
        String record = value.toString();
        if ((record == null) || (record.length() == 0)) {
            return;
        }
        String[] tokens = StringUtils.split(record, ",");
        if (tokens.length == 3) {
            // tokens[0] = name of timeseries as string
            // tokens[1] = timestamp
            // tokens[2] = value of timeseries as double
            Date date = DateUtil.getDate(tokens[1]);
            if (date == null) {
                return;
            }
            long timestamp = date.getTime();
            //reducerKey为:name,timestamp
            //reducerKey为:timestamp,value
            reducerKey.set(tokens[0], timestamp);
            reducerValue.set(timestamp, Double.parseDouble(tokens[2]));
            // emit key-value pair
            output.collect(reducerKey, reducerValue);
        } 
        else {
            // log as error, not enough tokens
        }
    }
//reduce函数
 @Override
    public void reduce(CompositeKey key,
            Iterator<TimeSeriesData> values,
            OutputCollector<Text, Text> output,
            Reporter reporter)
            throws IOException {

        // note that values are sorted.
        // apply moving average algorithm to sorted timeseries
        Text outputKey = new Text();
        Text outputValue = new Text();
        //设置窗口的大小
        MovingAverage ma = new MovingAverage(this.windowSize);
        while (values.hasNext()) {
            TimeSeriesData data = values.next();
            ma.addNewNumber(data.getValue());
            //调用getMovingAverage函数获取平均值
            double movingAverage = ma.getMovingAverage();
            long timestamp = data.getTimestamp();
            String dateAsString = DateUtil.getDateAsString(timestamp);
            outputValue.set(dateAsString + "," + movingAverage);
            outputKey.set(key.getName());
            output.collect(outputKey, outputValue);
        }
    } 

基于传统的内存式Scala实现

//基于内存排序的传统Scala形式
 def main(args: Array[String]): Unit = {
    if (args.size < 3) {
      println("Usage: MovingAverageInMemory <window> <input-dir> <output-dir>")
      sys.exit(1)
    }
    //定义一个sparkConf对象
    val sparkConf = new SparkConf().setAppName("MovingAverageInMemory")
    val sc = new SparkContext(sparkConf)

    val window = args(0).toInt
    val input = args(1)
    val output = args(2)
    //把窗口的数量设置为一个广播的变量
    val brodcastWindow = sc.broadcast(window)

    val rawData = sc.textFile(input)
    val keyValue = rawData.map(line => {
      val tokens = line.split(",")
      (tokens(0), (tokens(1), tokens(2).toDouble))
    })

    // 对Key进行排序,key的格式如:IBM, GOOG, AAPL, etc
    val groupByStockSymbol = keyValue.groupByKey() 

    val result = groupByStockSymbol.mapValues(values => {
      val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")      
      // 在内存中排序,mapValues只对value操作,其中s._1为时间,s._2为价格
      val sortedValues = values.map(s => (dateFormat.parse(s._1).getTime.toLong, s._2)).toSeq.sortBy(_._1) 
      val queue = new scala.collection.mutable.Queue[Double]()
      //把value添加到队列中,之后判断队列的数量
      for (tup <- sortedValues) yield {
        queue.enqueue(tup._2)
        if (queue.size > brodcastWindow.value)
          queue.dequeue

        (dateFormat.format(new java.util.Date(tup._1)), (queue.sum / queue.size))
      }
    })

    // 输出整个结果值
    // <stock_symbol><,><date><,><moving_average>
    val formattedResult = result.flatMap(kv => {
      kv._2.map(v => (kv._1 + "," + v._1 + "," + v._2.toString()))
    })
    formattedResult.saveAsTextFile(output)
    
    //done 
    sc.stop()
  }

基于分区分组的Scala实现

//基于二次排序的Scala实现
 def main(args: Array[String]): Unit = {
    if (args.size < 4) {
      println("Usage: MemoryMovingAverage <window> <number-of-partitions> <input-dir> <output-dir>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("MovingAverage")
    val sc = new SparkContext(sparkConf)

    val window = args(0).toInt
    val numPartitions = args(1).toInt // number of partitions in secondary sorting, choose a high value
    val input = args(2)
    val output = args(3)

    val brodcastWindow = sc.broadcast(window)

    val rawData = sc.textFile(input)

    // Key contains part of value (closing date in this case)
    val valueTokey = rawData.map(line => {
      val tokens = line.split(",")
      val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
      val timestamp = dateFormat.parse(tokens(1)).getTime
      (CompositeKey(tokens(0), timestamp), TimeSeriesData(timestamp, tokens(2).toDouble))
    })

    // 进行分区再分组的二次排序,其中CompositeKeyPartitioner进行分区,CompositeKey进行内排序
    val sortedData = valueTokey.repartitionAndSortWithinPartitions(new CompositeKeyPartitioner(numPartitions))
    //获得(CompositeKey(stockSymbol, timestamp),TimeSeriesData(timestamp, price)
    val keyValue = sortedData.map(k => (k._1.stockSymbol, (k._2)))
    val groupByStockSymbol = keyValue.groupByKey()

    //对value进行操作,与内存的不一样的是,这里没用到sort
    val movingAverage = groupByStockSymbol.mapValues(values => {
      val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
      val queue = new scala.collection.mutable.Queue[Double]()
      for (TimeSeriesData <- values) yield {
        queue.enqueue(TimeSeriesData.closingStockPrice)
        if (queue.size > brodcastWindow.value)
          queue.dequeue

        (dateFormat.format(new java.util.Date(TimeSeriesData.timeStamp)), (queue.sum / queue.size))
      }
    })
    
    // output will be in CSV format
    // <stock_symbol><,><date><,><moving_average>
    val formattedResult = movingAverage.flatMap(kv => {
      kv._2.map(v => (kv._1 + "," + v._1 + "," + v._2.toString()))
    })
    formattedResult.saveAsTextFile(output)
    
    // done
    sc.stop()
  }

}

// 定义两个Case class的javabean
case class CompositeKey(stockSymbol: String, timeStamp: Long)
case class TimeSeriesData(timeStamp: Long, closingStockPrice: Double)

// 定义CompositeKey的排序函数,也即可是组内排序
object CompositeKey {
  implicit def ordering[A <: CompositeKey]: Ordering[A] = {
    Ordering.by(fk => (fk.stockSymbol, fk.timeStamp))
  }
}


//---------------------------------------------------------
// the following class defines a custom partitioner by
// extending abstract class org.apache.spark.Partitioner
//---------------------------------------------------------
import org.apache.spark.Partitioner

class CompositeKeyPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions
  //getPartition用于划定分区
  def getPartition(key: Any): Int = key match {
    case k: CompositeKey => math.abs(k.stockSymbol.hashCode % numPartitions)
    case null            => 0
    case _               => math.abs(key.hashCode % numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: CompositeKeyPartitioner => h.numPartitions == numPartitions
    case _                          => false
  }

  override def hashCode: Int = numPartitions
}


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容