storm分布式计算与问题connection refuse排查。

由于项目需要,需要用到storm做分布式计算与数据处理,storm的原理和相关介绍就不在此赘叙了。
项目中storm下发的bolt有2层:
首先编写一个topology:
public class HomeBandToplogy {
private static final String TOPOLOGY_NAME = "HomeBandToplogy";
private static final String KAFKA_SPOUT = "kafkaSpout";
private static final String KAFKA_BOLT = "kafkaBolt";
private static final String ANYNASIS_BOLT = "AnynasisBolt";
private static final Log log = LogFactory.getLog(HomeBandToplogy.class);

public static void main(String[] args) throws AuthorizationException {
    PropertyUtil property = null;
    try {
        HBase.createTable(BaseFunction.ods_ott_userbehavior_buffer, new String[]{BaseFunction.ods_ott_userbehavior_buffer_family}, false);
        HBase.createTable(BaseFunction.ods_ott_deviceinfo_buffer, new String[]{BaseFunction.ods_ott_deviceinfo_buffer_family}, false);
        HBase.createTable(BaseFunction.ods_ott_videoinformation_buffer, new String[]{BaseFunction.ods_ott_videoinformation_buffer_family}, false);
        HBase.createTable(BaseFunction.ods_ott_videoinfo_recommend_buffer, new String[]{BaseFunction.ods_ott_videoinfo_recommend_buffer_family}, false);
        HBase.createTable(BaseFunction.ods_ott_deviceinfo_buffer, new String[]{BaseFunction.ods_ott_deviceinfo_buffer_family}, false);
    } catch (Exception e) {
        e.printStackTrace();
    }
    property = PropertyUtil.getInstance();
    String zks = property.getString("kafka.zookeeper.server", "");//master:2181,node1:2181,node2:2181
    String topic = property.getString("kafka.zookeeper.topic", "rawMessage");//消息的topic
    String zkRoot = property.getString("kafka.zookeeper.zkRoot", "/storm-kafka");//strom在zookeeper上的根,生成日志。
    String id = property.getString("kafka.zookeeper.id", "stormKafka");//自己定义的id
    BrokerHosts brokerHosts = new ZkHosts(zks);
    SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    spoutConf.ignoreZkOffsets = false;
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(KAFKA_SPOUT, kafkaSpout, 1);
    builder.setBolt(KAFKA_BOLT, new KafkaBolt(), 16).shuffleGrouping(KAFKA_SPOUT);
    builder.setBolt(ANYNASIS_BOLT, new AnynasisBolt(), 72).shuffleGrouping(KAFKA_BOLT);
    Config config = new Config();
    config.setDebug(false);
    config.setNumWorkers(16);
    final int pendingnum = property.getInt("pendingnum", 0);
    config.setMaxSpoutPending(pendingnum);//设置最大的appending数
    if (args != null && args.length > 0) {
        try {
            StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        } catch (AlreadyAliveException e) {
            e.printStackTrace();
        } catch (InvalidTopologyException e) {
            e.printStackTrace();
        }
    } else {
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
    }
}

}

然后编写一个kafkabolt和一个AnynasisBolt,如下:
kafkabolt:
public class KafkaBolt extends BaseRichBolt {
OutputCollector collector;
Log logger;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
    logger = LogFactory.getLog(KafkaBolt.class);
}

@Override
public void execute(Tuple input) {
    String line = "";
    try {
        line = input.getString(0);
        final String[] temp = line.split("#KY#");
        if (temp.length != 3) {
            logger.error("thread id " + Thread.currentThread().getId() + " kafkaBolt invalid data " + line);
        } else {
            String id = temp[0];
            final int code = Integer.valueOf(temp[1]);
            final String value = temp[2];
            AbstractBoxService info = BoxFactory.getBoxInstance(id, MobileBoxCode.valueOf(code), value);
            if (info != null) {
                collector.emit(input, new Values(id, info));
            }
        }
    } catch (Exception e) {
        logger.error("parse error :" + line + ",the casue:" + e.getMessage());
    } finally {
        collector.ack(input);
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("id", "info"));
}

}

AnynasisBolt:
public class AnynasisBolt extends BaseRichBolt {
private OutputCollector collector;
Log logger;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
    logger = LogFactory.getLog(AnynasisBolt.class);
}

@Override
public void execute(Tuple input) {
    try {
        //tuple里面是一个size为2的元组(id,info)
        final String deviceid = input.getString(0);
        AbstractBoxService info = (AbstractBoxService) input.getValues().get(1);
        boolean status = true;
        boolean result = info.executeRedis();
        if (!result) status = false;
        result = info.executeHbase();
        if (!result) status = false;
        if (!result) {
            logger.error("AnynasisBolt error.the id:" + info.getId() + ",the value:" + info.getValue() + ",the code:" + info.getCode());
        }
        collector.ack(input);
        System.out.println("AnynasisBolt.execute end...");
    } catch (Exception e) {
        collector.fail(input);
        System.out.println("error....");
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}

工厂类:
public class BoxFactory {

static Log logger = LogFactory.getLog(BoxFactory.class);

public static AbstractBoxService getBoxInstance(String deviceid, MobileBoxCode code, String value) {
    AbstractBoxService info = null;
        switch (code) {
            //设备信息
            case Device_UIXml:
                info = new Device_UIXml(deviceid, code, value);
                break;
            case Device_BootTimeDelay:
                info = new Device_BootTimeDelay(deviceid, code, value);
                break;
            case Device_BaseInfo:
                info = new Device_BaseInfo(deviceid, code, value);
                break;
            case Device_ID:
                info = new Device_ID(deviceid, code, value);
                break;
            case Device_APKVerSion:
                info = new Device_APKVerSion(deviceid, code, value);
                break;
            case Device_ConnectionMode:
                info = new Device_ConnectionMode(deviceid, code, value);
                break;
            case Device_LicencesName:
                info = new Device_LicencesName(deviceid, code, value);
                break;
            case Device_SoftVersion:
                info = new Device_SoftVersion(deviceid, code, value);
                break;
            case Device_SupplierName:
                info = new Device_SupplierName(deviceid, code, value);
                break;
            case Device_TerminalID:
                info = new Device_TerminalID(deviceid, code, value);
                break;
            case Device_TerminalMode:
                info = new Device_TerminalMode(deviceid, code, value);
                break;
            // region 用户行为
            case UserBehavior_Search:
                info = new UserBehavior_Search(deviceid, code, value);
                break;
            case UserBehavior_PosterOpenTimes:
                info = new UserBehavior_PosterOpenTimes(deviceid, code, value);
                break;
            case UserBehavior_ChannelSwitchWaitTimes:
                info = new UserBehavior_ChannelSwitchWaitTimes(deviceid, code, value);
                break;
            case UserBehavior_OpenApp:
                info = new UserBehavior_OpenApp(deviceid, code, value);
                break;
            case UserBehavior_In:
                info = new UserBehavior_In(deviceid, code, value);
                break;
            case UserBehavior_TopIn:
                info = new UserBehavior_TopIn(deviceid, code, value);
                break;

            // region 视频播放
            case VideoPlay_Start:
                info = new VideoPlay_Start(deviceid, code, value);
                break;
            case VideoPlay_End:
                info = new VideoPlay_End(deviceid, code, value);
                break;
            case VideoPlay_Error:
                info = new VideoPlay_Error(deviceid, code, value);
                break;
            case VideoPlay_FirstFrameWaitTimes:
                info = new VideoPlay_FirstFrameWaitTimes(deviceid, code, value);
                break;
            case VideoPlay_Information:
                info = new VideoPlay_Information(deviceid, code, value);
                break;
            case VideoPlay_EPGConfig:
                info = new VideoPlay_EPGConfig(deviceid, code, value);
                break;
            case VideoPlay_KartunTimes:
                info = new VideoPlay_KartunTimes(deviceid, code, value);
                break;
            case VideoPlayRecomend_End:
                info = new VideoPlayRecomend_End(deviceid, code, value);
                break;
            case VideoPlayRecomend_Start:
                info = new VideoPlayRecomend_Start(deviceid, code, value);
                break;
            // endregion
            default:
                break;
        }
    return info;
}

}
接口类:
public interface BoxService extends Serializable {

/**
 * 数据写入redis
 *
 * @return
 */
public Boolean executeRedis();


/**
 * 写入hbase
 *
 * @return
 */
public Boolean executeHbase();

}

抽象类:
**

  • 构建实例抽象类
    */
    public abstract class AbstractBoxService implements BoxService, Serializable {
    private String id;
    private MobileBoxCode code;
    private String value;

    public AbstractBoxService(String id, MobileBoxCode code, String value) {
    this.id = id;
    this.code = code;
    this.value = value;
    }

    public String getId() {
    return id;
    }

    public void setId(String id) {
    this.id = id;
    }

    public MobileBoxCode getCode() {
    return code;
    }

    public void setCode(MobileBoxCode code) {
    this.code = code;
    }

    public String getValue() {
    return value;
    }

    public void setValue(String value) {
    this.value = value;
    }

@override
public Boolean executeRedis( return null;);

@override
public Boolean executeHbase( return null;);

@Override
public String toString() {
    return "AbstractBoxService{" +
            "id='" + id + '\'' +
            ", code=" + code +
            ", value='" + value + '\'' +
            '}';
}

}

继承抽象类的实现类业务逻辑部分就不在这里描述了。

storm程序启动以后,小批量数据运行正常。
继续加大数据测试,数据量达到几十万的时候,出现异常,异常如下:
[ERROR] connection attempt 9 to Netty-Client-node5/172.16.1.100:6700 failed: java.net.ConnectException: Connection refused: node5/172.16.1.100:6700
2018-11-16 17:46:11.533 o.a.s.u.StormBoundedExponentialBackoffRetry client-boss-1 [WARN] WILL SLEEP FOR 420ms (MAX)
同时storm程序大量ack失败。
开始以后是线程数过多,以及环境资源紧张导致此种异常。
后来经过重重排查,将接口去掉,将抽象类中的方法变为抽象方法后,程序运行正常。
为何出现这样的异常呢?
原因在于,storm在处理的时候只会处理当前进程下的任务,跨进程的调度是无法实现的,故产生这样的故障。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,649评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,621评论 18 399
  • 废话不多说,自己进入今天的主题 1、面向对象的特征有哪些方面? 答:面向对象的特征主要有以下几个方面: - 抽象:...
    传奇内服号阅读 2,347评论 1 31
  • 亲爱的母亲: 近日来,身体可还好?自我上高中以后,我们之间似乎生分了许多,我有些害怕,紧张。春雨,夏日,秋风,冬...
    不辞水阅读 985评论 6 5
  • 题目 一个数组的MaxTree定义: 数组必须没有重复元素 MaxTree是一棵二叉树,数组的每一个值对应一个二叉...
    永志阅读 974评论 0 0