数据清洗例子

//所有数据的 key,省略一些值
public static final String colomns = "touch_type,touch_time,event_type,click_time,customer_user_id...";

/**
 * 正则表达式 截取用户行为埋点值
 */
public static final String REGUBC = "(.*?)=(.*?)&";

/**
 * 定义hive分隔符
 */
public static final String HIVE_SEPERATE = "\001";

下面是 Main 方法中要执行的:

//从 HDFS 中加载文件,转成RDD
JavaRDD<String> logRdd = javaSparkContext.textFile(scanFiles);

final Broadcast<String[]> broadcast = javaSparkContext.broadcast(colomns.split(","));

//把原始数据 通过正则表达式过滤,最终转换为 JSONObject 类型的 HashSet
JavaRDD<JSONObject> jsonRdd = logRdd.mapPartitions(new FlatMapFunction<Iterator<String>, JSONObject>() {
    final String[] broadcastValue = broadcast.value();

    @Override
    public Iterable<JSONObject> call(Iterator<String> stringIterator) throws Exception {
        Set<JSONObject> jsonObjs = new HashSet<JSONObject>();

        while(stringIterator.hasNext()){
            //把 log 数据取出来
            String inputValue = stringIterator.next();
            int startIndex = inputValue.indexOf("_app.gif?");
            int endIndex = inputValue.lastIndexOf("HTTP/");
            String eventLog = inputValue.substring(startIndex + 9 , endIndex-1) + "&";

            //通过正则表达式匹配,存到 Map
            Matcher m = CommonUtil.getMatcher(eventLog, REGUBC);
            JSONObject jsonObj = new JSONObject();
            Map<String, String> map = new HashMap<>();
            while (m.find()) {
                map.put(m.group(1), m.group(2));
            }

            //Map <--> colomns,保存到 JSONObject
            for (int i = 0; i<broadcastValue.length; i++) {
                String columnName = broadcastValue[i];
                if (map.containsKey(columnName)){
                    //String value = URLDecoder.decode(map.get(columnName) , "UTF-8");
                    jsonObj.put(columnName, map.get(columnName));
                } else {
                    jsonObj.put(columnName,"");
                }
            }

            //保存到 JSONObject 类型的 HashSet 中
            jsonObjs.add(jsonObj);

        }

        return jsonObjs;
    }
});

//从 jsonObject 中 取出 value,并连接起来
JavaRDD<String> javaJsonRdd = jsonRdd.map(new Function<JSONObject, String>() {
    @Override
    public String call(JSONObject jsonObject) throws Exception {
        List<String> valuesList = new ArrayList<String>(jsonObject.values());
        //把 list 中的数据 连接起来,比如:Joiner.on("; ").join("tom", "jerry", "jack") => "tom; jerry; jack"
        String hiveEvent = Joiner.on(HIVE_SEPERATE).join(valuesList.iterator());
        return hiveEvent;
    }
});

//保存到 HDFS
javaJsonRdd.saveAsTextFile(hdfsOutputFilePath);
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,547评论 8 265
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,868评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,754评论 18 399
  • 1------------------------------------------------摘录2-----...
    Ada_Bleau阅读 144评论 0 0
  • 2018年1月20日 回家的火车上,没买到其他票的我奢侈一把 在软卧上拿出了今天79分的数学试卷和海天英语小课堂(...
    瓦尔登荷塘阅读 315评论 0 0