flume自定义 ES SINk插件,AVRO格式数据写入ES

package com.vacp.collecor;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.google.common.base.Preconditions;

import com.google.common.base.Throwables;

import com.vacp.common.AvroHelper;

import com.vacp.common.DateUtils;

import com.vacp.common.ObjectSerializeUtils;

import com.vacp.es.ESClientFactory;

import com.videtek.kafka.VehiclePassingInfo;

import org.apache.commons.lang.StringUtils;

import org.apache.flume.*;

import org.apache.flume.conf.Configurable;

import org.apache.flume.sink.AbstractSink;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.Calendar;

import java.util.Date;

import static com.vacp.es.ESClientFactory.saveToEsBulk;

public class AvroVehicleCollectorextends AbstractSinkimplements Configurable{

private Loggerlogger = LoggerFactory.getLogger(AvroVehicleCollector.class);

    private Stringhostname;

    private Stringport;

    StringdayTime ;

    private int batchSize;

    public static boolean outlogger =false;

    private boolean bulktype=true;//默认 update

    public AvroVehicleCollector() {

logger.info("VehicleUpdateCollector start...");

    }

@Override

    public void configure(Context context) {

hostname = context.getString("hostname");

        Preconditions.checkNotNull(hostname, "es hostname must be set!!");

        dayTime = context.getString("daytime");

        Preconditions.checkNotNull(dayTime, "daytime must be set!!example:6,18");

        batchSize = context.getInteger("batchSize", 100);

        outlogger ="true".equals(context.getString("outlogger"));

        bulktype ="create".equals(context.getString("bulktype"))?false:true;

        Preconditions.checkNotNull(dayTime, "daytime must be set!!example:6,18");

        Preconditions.checkNotNull(batchSize >0, "batchSize must be a positive number!!");

        ESClientFactory.init(hostname);

    }

@Override

    public void start() {

super.start();

    }

@Override

    public void stop() {

super.stop();

    }

@Override

    public Statusprocess()throws EventDeliveryException {

Status result = Status.READY;

        Channel channel = getChannel();

        Transaction transaction =null;

        try {

transaction = channel.getTransaction();

            transaction.begin();

            Event event =null;

            String content =null;

            StringBuilder vehicleList =new StringBuilder();

            int maxsize=0;

            try {

for (int i =0; i

maxsize++;

                    event = channel.take();

                    if (event !=null) {

//对事件进行处理

                        AvroHelper helper =new AvroHelper();

                        VehiclePassingInfo  vehicle= helper.deserialize(VehiclePassingInfo.class, event.getBody());

                        content = ObjectSerializeUtils.toJSON(vehicle);

                        //System.out.println(content);

                        JSONObject vehicleMap =  JSON.parseObject(content);

                        Stringpass_time=String.valueOf(vehicleMap.get("pass_time"));

                        //时间戳 格式化yyyy-MM-dd HH:mm:ss

                        pass_time=DateUtils.formatDatestamp(pass_time);

                        if(StringUtils.isBlank(pass_time)) {

continue;

                        }

                     //HHmmss 时段查询处理优化查询速度

                    //script过滤某几个月内 每天几点到几点的数据效率不高


                        int timeNum = getTimeNum(pass_time);

                        String indexName = DateUtils.getIndexName("vacp", pass_time);

                        String typeName = DateUtils.getTypeName("vehicle", pass_time);

                        vehicleMap.put("pass_time", pass_time);

                        vehicleMap.put("timenum", timeNum);


                        vehicleMap.put("mark_time", mark_time);

                        String vehicleId = vehicleMap.getString("vehicle_id");

                        String tollgateId = vehicleMap.getString("tollgate_id");

                        Stringplate_no= vehicleMap.getString("plate_no");


vehicleMap.put("daynight", daynight);


vehicleList.append("{ \"index\":{");

                            vehicleList.append("\"_id\":\"" + vehicleId +"\",\"_index\":\"" + indexName +"\",\"_type\":\"" + typeName +"\"}}");

                            vehicleList.append("\r\n");

                            vehicleList.append(JSON.toJSONString(vehicleMap));

                            vehicleList.append("\r\n");


}else {

result = Status.BACKOFF;

break;

                    }

}

}catch (Exception e){

logger.error("channel.take();." +maxsize, e);

                Throwables.propagate(e);

            }

if (vehicleList.length() >0) {

//提交失败不做回滚

                  if(outlogger) {

System.out.println("==========POST Vehicle JSON====================");

                      logger.info(vehicleList.toString());

              }

saveToEsBulk(vehicleList.toString());

            }

result = Status.READY ;

            transaction.commit();//通过 commit 机制确保数据不丢失

        }catch (Exception e) {

transaction.rollback();

            e.printStackTrace();

            logger.error("Failed to commit transaction." +

"Transaction rolled back.", e);

            Throwables.propagate(e);

        }finally {

if (transaction !=null) {

transaction.close();

                logger.debug("close Transaction");

            }

}

return result;

    }

private int checkDayTime(String passTime){

Date dt = DateUtils.parseDate(passTime);

        Calendar cal  = Calendar.getInstance();

        cal.setTime(dt);

        int hour = cal.get(Calendar.HOUR_OF_DAY);

        int start = Integer.parseInt(dayTime.split(",")[0]);

        int end = Integer.parseInt(dayTime.split(",")[1]);

        if(hour>=start&&hour>=end) {

return 1;

        }else{

return 0;

        }

}

private  int getTimeNum(String passTime){

return  Integer.valueOf(passTime.substring(11).replace(":","")+"00");

    }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,860评论 18 139
  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi阅读 7,424评论 0 10
  • 1 不知道大家有没有发现,中国今天整个商业环境正在发生变化。 一来是创业群体越来越年轻,如果今天我们出去应酬一番,...
    笨拙的人阅读 411评论 0 0
  • 天帝发旨诏他去,体弱多病跪身躯。 七岁成诗名人识,小人中伤难入仕。 呕心沥血母亲忧,孤魂野鬼墓边愁。 游历四方年军...
    阿荣部分失落的作品阅读 697评论 0 1
  • 温暖如诗 温暖如你 2018.1.7 赤水 小雨 琴弦断了,你守候在琴弦边,默默流泪 燕子飞走了,找不到春天,...
    冯玙哲阅读 310评论 0 3