1,通过Flume二次开发将文件通过Phoenix Sink 导入
1),报错信息如下
[conf-file-poller-0-SendThread(node2.cm.com:2181)] (org.apache.phoenix.shaded.org.apache.zookeeper.ClientCnx
n$SendThread.run:1102) - Session 0x2699ed07890705c for server node2.cm.com/10.10.10.10:2181, unexpected error, closing socket connection
and attempting reconnect
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.phoenix.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
at org.apache.phoenix.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:355)
at org.apache.phoenix.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
查看Flume代码,Phoenix 端,为单条Upset,大量提交导致RegionServer发生Full GC,Zookeeper连接超时,更改代码为批量提交
public void init(String columns) throws SQLException {
if (!columns.equals(this.columns)) {
this.columns = columns;
String[] columnArray = columns.split("\001");
sql.append("upsert into ").append(namespace).append(".").append(tableName).append(" values( NEXT VALUE FOR ").append(this.sequenceName);
DatabaseMetaData dbmd = conn.getMetaData();
ResultSet rs = dbmd.getColumns(null, namespace, tableName, null);
while (rs.next()) {
String cn = rs.getString("COLUMN_NAME").toUpperCase();
if (cn.equals("FILEID")) {
continue;
}
if(column.toUpperCase().equals(rs.getString("COLUMN_NAME").toUpperCase())){
String type = rs.getString("TYPE_NAME");
columnTypes.add(type);
sql.append(",?");
}
sql.append(")");
ps = conn.prepareStatement(sql.toString());
}
}
public void upsert(List values) throws SQLException {
for (int j = 0; j < values.size(); j++) {
List<String> valuesTmp = (List<String>) values.get(j);
synchronized (conn) {
for (int i = 0; i < columnTypes.size(); i++) {
String columnType = columnTypes.get(i);
int index = i + 1;
String val = valuesTmp.get(i);
switch (columnType) {
case "VARCHAR":
ps.setString(index, val);
break;
case "INTEGER":
ps.setInt(index, Integer.parseInt("".equals(val) ? "0" : val));
break;
case "BIGINT":
ps.setLong(index, Long.parseLong("".equals(val) ? "0" : val));
break;
case "FLOAT":
ps.setFloat(index, Float.parseFloat("".equals(val) ? "0.0" : val));
break;
case "DOUBLE":
ps.setDouble(index, Double.parseDouble("".equals(val) ? "0.00" : val));
break;
case "DATE":
if ("".equals(val)) {
ps.setObject(index, null);
} else {
ps.setDate(index, Date.valueOf(val));
}
break;
case "TIMESTAMP":
if ("".equals(val)) {
ps.setObject(index, null);
} else {
ps.setTimestamp(index, Timestamp.valueOf(val));
}
break;
}
}
}
ps.addBatch();
}
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
2,通过Spark导入
Job aborted due to stage failure: Task 3 in stage 5.0 failed 4 times, most recent failure: Lost task 3.3 in stage 5.0 (TID 17, node1.cm.com, executor 2): java.lang.NumberFormatException: For input string: "P02、P08≥5.86.08"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:61)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:121)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:108)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1111)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1111)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1091)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
报错信息,P02、P08≥5.86.08 转Number失败
获取Hdfs文件,hdfs dfs -getmerge /user/phoenix/ tmp.dat
通过vi打开文件,搜索报错行,发现信息截断,导致列不匹配报错,修改抽取Sqoop抽取程序,增加-hive-drop-import-delims,删除字段特殊字符,包含换行符
原程序
sqoop import --connect 'jdbc:sqlserver://xxx' --username xu --password xp \
--query 'select "state" from xx WHERE $CONDITIONS' \
--split-by id --target-dir /user/phoenix/xx/xx.xx --direct --delete-target-dir --null-string '' --null-non-string '' --fields-terminated-by '\001'
修改后程序
sqoop import --connect 'jdbc:sqlserver://xxx' --username xu --password xp \
--query 'select "state" from xx WHERE $CONDITIONS' \
--split-by id --target-dir /user/phoenix/xx/xx.xx -hive-drop-import-delims --delete-target-dir --null-string '' --null-non-string '' --fields-terminated-by '\001'