Phoenix导入

1,通过Flume二次开发将文件通过Phoenix Sink 导入

1),报错信息如下

[conf-file-poller-0-SendThread(node2.cm.com:2181)] (org.apache.phoenix.shaded.org.apache.zookeeper.ClientCnx
n$SendThread.run:1102)  - Session 0x2699ed07890705c for server node2.cm.com/10.10.10.10:2181, unexpected error, closing socket connection
 and attempting reconnect
java.io.IOException: Connection reset by peer 
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method) 
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at org.apache.phoenix.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
        at org.apache.phoenix.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:355)
        at org.apache.phoenix.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)

查看Flume代码,Phoenix 端,为单条Upset,大量提交导致RegionServer发生Full GC,Zookeeper连接超时,更改代码为批量提交

 public void init(String columns) throws SQLException {
        if (!columns.equals(this.columns)) {
            this.columns = columns;
            String[] columnArray = columns.split("\001");
            sql.append("upsert into ").append(namespace).append(".").append(tableName).append(" values( NEXT VALUE FOR ").append(this.sequenceName);
            DatabaseMetaData dbmd = conn.getMetaData();
            ResultSet rs = dbmd.getColumns(null, namespace, tableName, null);
            while (rs.next()) {
                String cn = rs.getString("COLUMN_NAME").toUpperCase();
                if (cn.equals("FILEID")) {
                    continue;
                }
                if(column.toUpperCase().equals(rs.getString("COLUMN_NAME").toUpperCase())){
                String type = rs.getString("TYPE_NAME");
                columnTypes.add(type);
                sql.append(",?");
            }
            sql.append(")");
            ps = conn.prepareStatement(sql.toString());
        }
    }
public void upsert(List values) throws SQLException {
        for (int j = 0; j < values.size(); j++) {
            List<String> valuesTmp = (List<String>) values.get(j);
            synchronized (conn) {
                for (int i = 0; i < columnTypes.size(); i++) {
                    String columnType = columnTypes.get(i);
                    int index = i + 1;
                    String val = valuesTmp.get(i);
                    switch (columnType) {
                        case "VARCHAR":
                            ps.setString(index, val);
                            break;
                        case "INTEGER":
                            ps.setInt(index, Integer.parseInt("".equals(val) ? "0" : val));
                            break;
                        case "BIGINT":
                            ps.setLong(index, Long.parseLong("".equals(val) ? "0" : val));
                            break;
                        case "FLOAT":
                            ps.setFloat(index, Float.parseFloat("".equals(val) ? "0.0" : val));
                            break;
                        case "DOUBLE":
                            ps.setDouble(index, Double.parseDouble("".equals(val) ? "0.00" : val));
                            break;
                        case "DATE":
                            if ("".equals(val)) {
                                ps.setObject(index, null);
                            } else {
                                ps.setDate(index, Date.valueOf(val));
                            }
                            break;
                        case "TIMESTAMP":
                            if ("".equals(val)) {
                                ps.setObject(index, null);
                            } else {
                                ps.setTimestamp(index, Timestamp.valueOf(val));
                            }
                            break;
                    }
                }
            }
            ps.addBatch();
        }
        ps.executeBatch();
        conn.commit();
        ps.clearBatch();
    }

2,通过Spark导入

Job aborted due to stage failure: Task 3 in stage 5.0 failed 4 times, most recent failure: Lost task 3.3 in stage 5.0 (TID 17, node1.cm.com, executor 2): java.lang.NumberFormatException: For input string: "P02、P08≥5.86.08"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
    at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:61)
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:121)
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:108)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1111)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:

报错信息,P02、P08≥5.86.08 转Number失败
获取Hdfs文件,hdfs dfs -getmerge /user/phoenix/ tmp.dat
通过vi打开文件,搜索报错行,发现信息截断,导致列不匹配报错,修改抽取Sqoop抽取程序,增加-hive-drop-import-delims,删除字段特殊字符,包含换行符
原程序

sqoop import --connect 'jdbc:sqlserver://xxx' --username xu --password xp \
--query 'select  "state"     from xx WHERE $CONDITIONS' \
--split-by id  --target-dir /user/phoenix/xx/xx.xx --direct --delete-target-dir --null-string '' --null-non-string '' --fields-terminated-by '\001'

修改后程序

sqoop import --connect 'jdbc:sqlserver://xxx' --username xu --password xp \
--query 'select  "state"     from xx WHERE $CONDITIONS' \
--split-by id  --target-dir /user/phoenix/xx/xx.xx -hive-drop-import-delims --delete-target-dir --null-string '' --null-non-string '' --fields-terminated-by '\001'
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 12,062评论 0 34
  • .Azkaban工作流引擎和Flume数据采集 Azkaban介绍 一、Azkaban简介 为什么需要工作流调度系...
    依天立业阅读 6,253评论 0 2
  • 导读: 第一章:初识Hadoop 第二章:更高效的WordCount 第三章:把别处的数据搞到Hadoop上 第四...
    yoku酱阅读 1,917评论 0 2
  • 在这个繁冗的世界,为何不多思考,多想想,聆听自己内心深处的声音。锤炼的沉稳,成熟的心态。不要一叶障目,不要性格使然。
    陪你看雪阅读 2,860评论 0 0
  • 爱你究竟是怎样的一种感觉 是半夜三更的翻来覆去 还是做梦时笑出了声音 我不想总是想你,我想还是留些时间 看看世界上...
    墨上城阅读 4,245评论 4 29