datax+gbload+GBase入库适配操作手顺

原文链接:https://www.gbase.cn/community/post/4999

更多精彩内容尽在南大通用GBase技术社区,南大通用致力于成为用户最信赖的数据库产品供应商。

一、环境准备

1、开启vsftpd服务,注意修改,默认路径(gbload_server_9.5.25版本支持sftp协议)

userlist_enable=YES -- 设定userlist 可用,一般是deny。这个不强求。如果是,将对应用户从列表注销(ftpuser、user_list)

local_root=/opt/gbload -- 设定一个默认目录。对应下面 gbload_ftp_server_base_path 这个路径。

chroot_local_user=YES -- 限制在一个目录。

allow_writeable_chroot=YES -- 这个目录可写


2、安装gccli客户端

注意修改环境变量,参考配置文件


3、dispcli (区分python2 和3 需要申请特定注意。要python某个版本编译的)

放入目录,比如 /opt/dispcli (这里里面才是dispcli文件)


4、Java环境变量 (注意将sgloader 和jdbc 也配置进去。不一定有用,配置了肯定可以)

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-11.b12.el7.x86_64

export PATH=.:$PATH:$JAVA_HOME/bin

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/opt/gbloader_test/ gbase-connector-java-8.3.81.53-build55.4.1-bin.jar:/opt/gbloader_test/sgLoader/sgloader-0.0.1.jar


二、gbloader部署

1、解压后有2个目录,api_file 、server_file ,比如放在/opt/gbload_server


2、创建目录用于放加载临时文件

mkdir -p /opt/ gbload_server /data/main

mkdir -p /opt/ gbload_server /data/copy

mkdir -p /opt/ gbload_server /data/kvstore


3、修改gbload_server的配置文件,在/opt/gbload_server/server_file/下面,名字叫gbload_server.conf

修改的几个内容如下:

gbload_config_server_host=10.10.3.233  #就是部署本机的IP地址

gbload_config_extern_master_server=10.10.3.233 #就是部署本机的IP地址,以后如果加载机有扩展的情况下,这里写的是master 的IP地址,只有一台加载机则就是本机的IP地址

gbload_data_key_retention_time=2 #表加载临时信息的保存时间,单位位秒,设置2就行;

gbload_ftp_server_host_name=10.10.3.233 #就是部署本机的IP地址

gbload_ftp_server_user_passwd=gbase:gbase  #连接本机ftp服务的用户名和密码,格式为user:passwd

gbload_ftp_server_base_path=/opt/gbloader_test/server_file/  #本机ftp服务的主目录,用户由gbload_ftp_server_user_passwd指定,加载服务必须对此目录有读写权限

gbload_dispatch_server_ip_port=10.10.3.235:8080 #dispatch服务的IP和端口

gbload_dispatch_client_path=/opt/gbloader_test/dispatch_server/ #dispcli所在的路径

gbload_gnode_gbloader_path=/opt/gbloader_test/server_file/ #gbloader所在的路径

gbload_database_server_host=10.10.3.109 #数据库节点的IP地址;需要制定本地gcluster/gnode使用的IP地址(GBase MPP Cluster的一个管理节点的IP)

gbload_file_count_per_controy_file=100000 #每个控制文件处理的数据文件个数


4、启动加载服务,在/opt/gbload_server/server_file下执行nohup sh ./ gbload_server_monit.sh &即可,并通过ps –ef|grep gbload_server看进程是否启动

5、gbload加载测试样例

表结构

Create database testdb;

Use testdb;

CREATE TABLE "lineorder" (

  "lo_orderkey" bigint(20) DEFAULT NULL,

  "lo_linenumber" int(11) DEFAULT NULL,

  "lo_custkey" int(11) DEFAULT NULL,

  "lo_partkey" int(11) DEFAULT NULL,

  "lo_suppkey" int(11) DEFAULT NULL,

  "lo_orderdate" int(11) DEFAULT NULL,

  "lo_orderpriority" varchar(15) DEFAULT NULL,

  "lo_shippriority" varchar(1) DEFAULT NULL,

  "lo_quantity" int(11) DEFAULT NULL,

  "lo_extendedprice" int(11) DEFAULT NULL,

  "lo_ordtotalprice" int(11) DEFAULT NULL,

  "lo_discount" int(11) DEFAULT NULL,

  "lo_revenue" int(11) DEFAULT NULL,

  "lo_supplycost" int(11) DEFAULT NULL,

  "lo_tax" int(11) DEFAULT NULL,

  "lo_commitdate" int(11) DEFAULT NULL,

  "lo_shipmode" varchar(10) DEFAULT NULL

) COMPRESS(5, 5) ENGINE=EXPRESS DISTRIBUTED BY('lo_orderkey');

Java样例代码

以下代码用的时候,修改红色加粗的IP地址就行,就是运行gbload_server的这台机器的IP地址。

url = "jdbc:gbload://" + host + ":" + port + "/" + db  + "?failoverEnable=true&hostList=" + "10.10.3.233" + "&basePath=" + sgLoader + "&sgloaderPath=" + sgLoader + "&interval=120&characterEncoding=gbk&UseOneServer=false";  // URL


import java.io.*;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.SQLException;

import java.util.ArrayList;


public class Lineorder implements Runnable{


 String host; // 加载服务IP地址

 String port = "8585"; // 端口

 String port2 = "5258";

 String db;  // 数据库名

 String table;   // 要加载的表名

 int psize;   // 每次提交的行数

 int thdCount; //线程数

 static int sleepTime = 0;  //每次提交sleep的毫秒数

 String sgLoader;

 String fileName ;


 public Lineorder(String host, String db, String table, int psize, String sgLoader, String fileName){

          this.host = host;

          this.db = db;

          this.table = table;

          this.psize = psize;

          this.sgLoader = sgLoader;

          this.fileName = fileName;

 }


 @Override

 public void run() {

          // TODO Auto-generated method stub

          String url = "jdbc:gbload://" + host + ":" + port + "/" + db  + "?failoverEnable=true&hostList=" + "10.10.3.233;10.10.3.234" + "&basePath=" + sgLoader + "&sgloaderPath=" + sgLoader + "&interval=120&characterEncoding=gbk&UseOneServer=false";  // URL


          try {

           Class.forName("cn.gbase.jdbc.LoadDriver").newInstance();

          } catch (Exception e) {

                  e.printStackTrace();

                  return;

          }


          Connection conn = null;

          PreparedStatement pstmt = null;


          try {

                  System.out.println(url);

                  conn =  DriverManager.getConnection(url, "gbase", "gbase20110531"); 

                  System.out.println("getConnection done");

          } catch (SQLException e1) {

                  e1.printStackTrace();

                  return;

          }


          if(conn == null){

                  System.err.println("conn is null");

                  return;

          }


          String sql = "insert into " + table + " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

          //String sql = "insert into lineorder (lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

          File file = new File(fileName);

        BufferedReader reader = null;

          try{

            reader = new BufferedReader(new FileReader(file));

            String tempString = null;

            String[] lines = null;

                  pstmt = conn.prepareStatement(sql);     

                  String threadName = Thread.currentThread().getName();

                  int line = 0;

            while ((tempString = reader.readLine()) != null) {


                lines = tempString.split("\\|");


                pstmt.setString(1, lines[0]);

                pstmt.setString(2, lines[1]);

                pstmt.setString(3, lines[2]);

                pstmt.setString(4, lines[3]);

                pstmt.setString(5, lines[4]);

                pstmt.setString(6, lines[5]);

                pstmt.setString(7, lines[6]);

                pstmt.setString(8, lines[7]);

                pstmt.setString(9, lines[8]);

                pstmt.setString(10, lines[9]);

                pstmt.setString(11, lines[10]);

                pstmt.setString(12, lines[11]);

                pstmt.setString(13, lines[12]);

                pstmt.setString(14, lines[13]);

                pstmt.setString(15, lines[14]);

                pstmt.setString(16, lines[15]);

                pstmt.setString(17, lines[16]);


                pstmt.addBatch();

                line++;

                if(line == psize){

                 line = 0;

                 pstmt.executeBatch();

                 conn.commit();

                 pstmt.clearBatch(); 

                }

            }


         pstmt.executeBatch();

         conn.commit();

         pstmt.clearBatch();

            reader.close();

           System.out.println(threadName+": ==================>LOAD END! " );

          }catch(Exception e){

                  e.printStackTrace();

          }finally{

            if (reader != null) {

                try {

                    reader.close();

                } catch (IOException e) {

                 e.printStackTrace();

                }

            }

                  try {

                          if (pstmt != null)

                                   pstmt.close();

                  } catch (SQLException e) {

                          e.printStackTrace();

                  }


                  try {

                          if(conn != null)

                                   conn.close();

                  } catch (SQLException e) {

                          e.printStackTrace();

                  }

          }       

 }


 public static void main(String[] args) {

          String host = args[0]; // 加载服务IP地址

          String db = args[1];   // 数据库名

          String table=args[2];    // 要加载的表名

          int psize = Integer.parseInt(args[3]) ;

          int thdCount = Integer.parseInt(args[4]);

          String sgLoader = args[5]; //sgLoader路径

          String fileName = args[6];  //加载的文件名


          ArrayList<Thread> loadThreads = new ArrayList<Thread>(thdCount);


          long beginTime = System.currentTimeMillis();


          // 启动加载线程。

          for(int i=1; i<=thdCount; i++){

                  Lineorder gbl = new Lineorder(host,db,table,psize,sgLoader,fileName);

                  Thread thd = new Thread(gbl);

                  loadThreads.add(thd);

          }


          for (Thread t : loadThreads) {

                  t.start();

          }


          for (Thread t : loadThreads) {

                  try {

                          t.join();

                  } catch (InterruptedException e) {

                          e.printStackTrace();

                  }

          }


          long endTime = System.currentTimeMillis();

          double useTime = (endTime - beginTime) / 1000; // 秒


          //double speed = (psize * pcount * thdCount ) / useTime;


          //System.out.println("rowCount: "+ psize * pcount * thdCount );

          System.out.println("time: "+ useTime + " sec");

          //System.out.println("speed: " + speed + " row/sec");

 }

}


Java代码编译及执行

编译:javac Lineorder.java

执行:java Lineorder 192.168.2.188 testdb lineorder 10000 1 /opt/gbload_server/api_file/ /opt/gbload_server/data/lineorder.tbl 


执行参数说明:

Lineorder为java编译出来的class文件

192.168.2.188为gbload_server运行所在机器的IP地址;

testdb:gbase 8a MPP cluster数据库中的数据库名称;

Lineorder:表名称;

10000:每次提交的数据量,建议10000~20000,但不是绝对,根据服务器的配置灵活调整,多测试几组数据找出最有的批量提交值;

1:线程数

/opt/ gbload_server /api_file/:sgloader-0.0.1.jar所在的路径

/opt/gbload_server/data/lineorder.tbl:需要加载的数据文件

测试加载,如下图所未,gbload加载成功


三、datax部署

1、解压datax.zip到/opt目录

2、解压公司的datax_plugin_build3.0.zip解压包,对应的plugin下面有reader和writer,放到datax的plugin下对应目录下面

3、用gbload_server的gcluster-load-api-8.5.1.2-build_20241217080553.jar包,datax的plugin里gbase8amppgbloaderwriter和gbase8amppjdbcwriter对应的jar包,保持版本一致(gbload_server高版本与datax自带jar包兼容有问题)

4、 python datax.py -r mysqlreader -w gbase8amppgbloaderwriter 可以生成样例,可以根据生成的样例进行修改,生成json任务文件

5、执行python datax.py ${name}.json

任务执行最后,统计信息

原文链接:https://www.gbase.cn/community/post/4999

更多精彩内容尽在南大通用GBase技术社区,南大通用致力于成为用户最信赖的数据库产品供应商。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容