03_一个基于java,hadoop的数据采集程序demo

1.文件分布

文件分布

2. 环境搭建

参考:02_在window环境开发hadoop_HDFS

3 需要采集的目录

image.png

4 说明:

  1. 需要采集的文件在accesslog里面并且以.log结尾
  2. 采集的思路是
    1. 先从accesslog移动进temp文件夹
    2. 从temp向HDFS上传
    3. 将temp里面的文件移动进backup文件夹
  3. 此demo是采用读取配置文件来得到路径以及配置信息
  4. 读取配置文件采用单列模式的懒汉式并考虑了线程安全

5. code

5.1 config.properties

LOG_SOURCE_DIR=H:/logs/accesslog/
LOG_TEMP_DIR=H:/logs/temp/
LOG_BACKUP_DIR=H:/logs/backup/

LOG_TIMEOUT=24
LOG_NAME=.log

HDFS_URL=hdfs://vm01:9000/
HDFS_DIR=/logs/
HDFS_F_NAME=access_log_
HDFS_L_NAME=.log

5.2 PropertiesHolderLaze

package com.looc.D02数据采集demo;

import java.io.IOException;
import java.util.Properties;

/**
 * 单列模式:懒汉式——并考虑线程安全
 * @author chenPeng
 * @version 1.0.0
 * @ClassName PropertiesHolderLaze.java
 * @Description TODO
 * @createTime 2019年01月28日 19:47:00
 */
public class PropertiesHolderLaze {
    private static Properties pros = null;

    public static Properties getPros() throws IOException {
        if (pros==null){
            synchronized (PropertiesHolderLaze.class){
                if (pros==null){
                    pros = new Properties();
                    pros.load(PropertiesHolderLaze.class.getClassLoader().getResourceAsStream(
                            "config.properties"));
                }
            }
        }
        return pros;
    }
}

5.3 CollectionData

package com.looc.D02数据采集demo;


import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Properties;
import java.util.TimerTask;
import java.util.UUID;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName CollectionData.java
 * @Description TODO
 * @createTime 2019年01月28日 19:18:00
 */
public class CollectionData extends TimerTask {

    private String LOG_SOURCE_DIR;
    private String LOG_TEMP_DIR;
    private String LOG_BACKUP_DIR;
    private String LOG_TIMEOUT;
    private String LOG_NAME;
    private String HDFS_URL;
    private String HDFS_DIR;
    private String HDFS_F_NAME;
    private String HDFS_L_NAME;
    private String TIME_MATE = "yyyy-MM-dd";


    /**
     * The action to be performed by this timer task.
     */
    @Override
    public void run() {
        //移动数据到temp
        //temp上传到hdfs
        //temp移动到backup
        try {
            init();
            //移动之前判断是否有文件
            if (!isNullDir()){
                moveToTemp();
                setUpToHdfs();
                tempToBackup();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    /**
     * 初始化得到配置文件
     * @Author chenpeng
     * @Description //TODO
     * @Date 20:19
     * @Param []
     * @return void
     **/
    public void init() throws IOException {
        Properties pros = PropertiesHolderLaze.getPros();

        LOG_SOURCE_DIR = pros.getProperty("LOG_SOURCE_DIR");
        LOG_TEMP_DIR = pros.getProperty("LOG_TEMP_DIR");
        LOG_BACKUP_DIR = pros.getProperty("LOG_BACKUP_DIR");
        LOG_TIMEOUT = pros.getProperty("LOG_TIMEOUT");
        LOG_NAME = pros.getProperty("LOG_NAME");
        HDFS_URL = pros.getProperty("HDFS_URL");
        HDFS_DIR = pros.getProperty("HDFS_DIR");
        HDFS_F_NAME = pros.getProperty("HDFS_F_NAME");
        HDFS_L_NAME = pros.getProperty("HDFS_L_NAME");
    }


    /**
     * 判断文件夹是否为空
     * @Author chenpeng
     * @Description //TODO
     * @Date 21:19
     * @Param []
     * @return boolean
     **/
    public boolean isNullDir(){
        if (new File(LOG_SOURCE_DIR).list().length==0){
            return true;
        }
        return false;
    }
    /**
     * 移动数据到temp
     * @Author chenpeng
     * @Description //TODO
     * @Date 20:17
     * @Param []
     * @return void
     **/
    public void moveToTemp() throws IOException {
        //拿到文件
        File[] logSourceArray = new File(LOG_SOURCE_DIR).listFiles();

        //移动文件
        for (File file : logSourceArray) {
            FileUtils.moveFileToDirectory(file,new File(LOG_TEMP_DIR),true);
        }
    }

    /**
     * temp上传到hdfs
     * @Author chenpeng
     * @Description //TODO
     * @Date 20:18
     * @Param []
     * @return void
     **/
    public void setUpToHdfs() throws URISyntaxException, IOException, InterruptedException {
        //拿到全部的文件
        File[] logTempArray = new File(LOG_TEMP_DIR).listFiles();

        //获取hdfs链接
        FileSystem fileSystem =
                FileSystem.get(new URI(HDFS_URL),new Configuration(),"root");

        //获取一个时间
        SimpleDateFormat sdf = new SimpleDateFormat(TIME_MATE);
        Calendar calendar = Calendar.getInstance();
        String time = sdf.format(calendar.getTime());

        //上传文件
        for (File file : logTempArray) {
            fileSystem.copyFromLocalFile(
                    new Path(file.getAbsolutePath()),
                    new Path(HDFS_DIR+"/"+time+"/"+
                            HDFS_F_NAME+time+UUID.randomUUID()+HDFS_L_NAME));
        }

        //关闭流
        fileSystem.close();
    }

    /**
     * temp移动到backup
     * @Author chenpeng
     * @Description //TODO
     * @Date 20:18
     * @Param []
     * @return void
     **/
    public void tempToBackup() throws IOException {
        //拿到全部的文件
        File[] logTempArray = new File(LOG_TEMP_DIR).listFiles();

        //拿到时间
        SimpleDateFormat sdf = new SimpleDateFormat(TIME_MATE);
        Calendar calendar = Calendar.getInstance();
        String time = sdf.format(calendar.getTime());

        for (File file : logTempArray) {
            FileUtils.moveFileToDirectory(
                    file,new File(LOG_BACKUP_DIR+"/"+time),true);
        }
    }
}

5.4 ClearDate

package com.looc.D02数据采集demo;

import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Properties;
import java.util.TimerTask;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName ClearDate.java
 * @Description TODO
 * @createTime 2019年01月28日 19:19:00
 */
public class ClearDate extends TimerTask {

    private String TIME_MATE = "yyyy-MM-dd";
    private String LOG_BACKUP_DIR;

    /**
     * The action to be performed by this timer task.
     */
    @Override
    public void run() {
        try {
            init();
            fondTimeOutFile();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }

    /**
     * 初始化路径
     * @Author chenpeng
     * @Description //TODO
     * @Date 21:25
     * @Param []
     * @return void
     **/
    public void init() throws IOException {
        Properties properties = PropertiesHolderLaze.getPros();

        LOG_BACKUP_DIR = properties.getProperty("LOG_BACKUP_DIR");
    }

    /**
     * 扫描文件夹是否存在超时文件
     * @Author chenpeng
     * @Description //TODO
     * @Date 21:26
     * @Param []
     * @return void
     **/
    public void fondTimeOutFile() throws IOException, ParseException {
        //拿到路径
        File[] fileArray = new File(LOG_BACKUP_DIR).listFiles();

        //拿到时间
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.DAY_OF_MONTH,-1);
        SimpleDateFormat sdf = new SimpleDateFormat(TIME_MATE);

        for (File file : fileArray) {

            if (calendar.getTime().getTime() > sdf.parse(file.getName()).getTime()){
                //删除
                FileUtils.deleteDirectory(file);
            }
        }
    }
}

5.5 Main

package com.looc.D02数据采集demo;


import java.io.IOException;
import java.util.Properties;
import java.util.Timer;

/**
 * 程序入口
 * @author chenPeng
 * @version 1.0.0
 * @ClassName Main.java
 * @Description TODO
 * @createTime 2019年01月28日 20:08:00
 */
public class Main {
    private static Integer LOG_TIMEOUT;

    public static void init() throws IOException {
        Properties properties = PropertiesHolderLaze.getPros();
        LOG_TIMEOUT = Integer.parseInt(properties.getProperty("LOG_TIMEOUT"));
    }

    public static void main(String[] args){
        Timer timer = new Timer();
        timer.schedule(new CollectionData(),0,LOG_TIMEOUT*60*60*1000L);
        timer.schedule(new ClearDate(),0,LOG_TIMEOUT*60*60*1000L);
    }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,427评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,551评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,747评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,939评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,955评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,737评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,448评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,352评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,834评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,992评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,133评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,815评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,477评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,022评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,147评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,398评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,077评论 2 355

推荐阅读更多精彩内容

  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,990评论 6 13
  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,938评论 2 89
  • 注:本文是我学习Hadoop权威指南的时候一些关键点的记录,并不是全面的知识点 Hadoop 避免数据丢失的方法:...
    利伊奥克儿阅读 738评论 0 2
  • 你好!陈俊生 文 秋雨 最近热播的《我的前半生》话题特别多,人们一边替罗子君捏了一把汗,一边大骂陈俊...
    阳光普照1阅读 377评论 0 0
  • 盗走浮华的梦境 浅留楚门的幻影 掠过南飞的白燕 惊起静坐的鱼鹰 碰碎死寂的镜湖 触发千层的圆涡 腾跃成群的银鱼 揭...
    桐月九阅读 242评论 0 3