【开源项目】Azkaban扩展插件azkaban-plugin-jobtype-sql

Azkaban SQL jobtype

一、功能说明

本项目为Azkaban的JobType扩展插件,实现Azkaban直接调度数据库脚本的Job。

适用场景:

  • 原本数据库存在大量定时任务+存储过程进行数据处理的项目,可以通过Azkaban进行改造,实现上下级依赖控制及调度监控,使用该插件改造工作量较小。
  • 对于较小数据量的数据处理,无需用到大数据处理架构,同时数据库负载也不高的,可以使用Azkaban+插件实现数据处理。

二、安装部署

  • 下载最新发行版本,如:azkaban-plugin-jobtype-sql-1.0.1.RELEASE.zip

下载地址:https://gitee.com/centy/azkaban-plugin-jobtype-sql/releases

  • azkaban-plugin-jobtype-sql-1.0.1.RELEASE.zip拷贝至${AzkabanHome}/plugins/jobtypes/目录下,直接解压到当前目录,
    解压后目录结构如下:

    • plugins
      • sql_job
        • lib
          • ant-1.10.5.jar
          • ant-launcher-1.10.5.jar
          • azkaban-plugin-jobtype-sql-1.0.1.RELEASE.jar
          • fastjson-1.2.23.jar
          • mysql-connector-java-5.1.38.jar
          • postgresql-42.2.45.jar
        • private.properties
  • 重启Azkaban完成安装。

三、插件配置参数

Flow参数

Flow参数可配置在Flow文件内,也可定义在系统环境变量、Azkaban全局参数、Azkaban控制台定义的Flow Parameter等,具体参考Azkaban的配置读取优先级。

Azkaban文档地址:https://azkaban.readthedocs.io/en/latest/getStarted.html

参数 说明
sql_job.database.type 数据库类型,支持mysql/postgresql
sql_job.database.driver 数据库驱动类com.mysql.jdbc.Driver
sql_job.database.host 数据库主机地址
sql_job.database.port 数据库端口
sql_job.database.database 数据库名称
sql_job.database.schema 实例或模式名称,postgresql数据库必填
sql_job.database.username 数据库用户名
sql_job.database.password 数据库密码

Job参数

Job参数定义在Job节点内,用于配置每个Job的定义。

参数 说明
sql_job.scripts SQL脚本路径,支持绝对路径和相对路径。相对路径是指SQL文件相对project根目录(即上传至Azkaban的zip压缩包的根目录)的路径。
支持多个路径,多个通过英文逗号进行分隔。

四、使用说明

本文示例基于Azkaban Flow 2.0,对于Flow 1.0也支持。

快速开始

本示例实现一个两个节点的工作流

示例地址

  • 1.创建quickstart_example文件夹,在这个文件夹中创建quickstart_example.flow工作流文件。
  • 2.在quickstart_example.flow文件中添加以下flow参数,将相关配置修改为你本地的配置:
config: 
  sql_job.database.type: mysql # 支持mysql、postgresql
  sql_job.database.driver: com.mysql.jdbc.Driver
  sql_job.database.host: localhost
  sql_job.database.port: 3306
  sql_job.database.database: demo
  sql_job.database.schema:  # postgresql建议必填
  sql_job.database.username: root
  sql_job.database.password: xxxxxxx
  • 3.实现第1个Job:create_and_insert_job。
    -3.1. 在quickstart_example文件夹下创建scripts文件佳,在scripts文件夹中创建一个SQL脚本文件,命名为create_and_insert.sql,里面通过脚本创建一个表并插入测试数据。

    create table if not exists table_a(
      id int(10) not null,
      name varchar(16) not null,
      value double(20,3) not null
    );
    
    insert into table_a values
     (1,"test-1",1),
     (2,"test-2",2),
     (3,"test-3",3),
     (4,"test-4",4),
     (5,"test-5",5),
     (6,"test-6",6),
     (7,"test-7",7),
     (8,"test-8",8)
     ;
    
    select count(*) from table_a;
    
    

    -3.2. quickstart_example.flow增加Job定义,JobType为"sql_job"。

    config: 
      sql_job.database.type: mysql # 支持mysql、postgresql
      sql_job.database.driver: com.mysql.jdbc.Driver
      sql_job.database.host: localhost
      sql_job.database.port: 3306
      sql_job.database.database: demo
      sql_job.database.schema:  # postgresql建议必填
      sql_job.database.username: root
      sql_job.database.password: 123456
    
    nodes:
      - name: create_and_insert_job
        type: sql_job
        config:
          sql_job.scripts: quickstart_example/scripts/create_and_insert.sql # 脚本路径
    
  • 4.实现第2个Job:create_and_insert_job。
    -4.1. 同3.1,在scripts文件夹中创建一个SQL脚本文件,命名为create_and_insert.sql,里面通过脚本创建一个表并插入测试数据。

    update table_a set value = value*2;
    
    select count(*) from table_a;
    
    

    -4.2. quickstart_example.flow增加Job定义,JobType为"sql_job"。

    config: 
      sql_job.database.type: mysql # 支持mysql、postgresql
      sql_job.database.driver: com.mysql.jdbc.Driver
      sql_job.database.host: localhost
      sql_job.database.port: 3306
      sql_job.database.database: demo
      sql_job.database.schema:  # postgresql建议必填
      sql_job.database.username: root
      sql_job.database.password: 123456
    
    nodes:
      - name: create_and_insert_job
        type: sql_job
        config:
          sql_job.scripts: quickstart_example/scripts/create_and_insert.sql # 脚本路径
      - name: update_value_job
        type: sql_job
        dependsOn:
        - create_and_insert_job
        config:
          sql_job.scripts: quickstart_example/scripts/update_value.sql # 脚本路径
    
  • 5.将quickstart_example打包成zip包,上传Azkaban的project中,上传成功后如下图。


    image
  • 6.执行工作流,结果如下图。


    image
image

占位符参数

本插件实现SQL脚本的占位符替换,实现原理如下:

  • SQL脚本中通过"${<parameterName>}"使用占位符参数,其中parameterName命名格式为:"sql_job.<自定义参数名>[.<自定义参数名2>]"
  • 可在Azkaban配置中增加parameterName参数配置,配置参数位置可以是环境变量、全局参数、Flow参数、甚至Job参数均可,具体可参考Azkaban配置读取逻辑。
  • 插件在执行SQL脚本前,先从Azkaban的配置参数获取对应的配置值,替换SQL脚本中的占位符参数,再执行脚本。

约束条件:

  • 占位参数命名必须严格按照要求。

使用示例

示例地址

  • 1.参照上面示例,将sql脚本文件进行修改,表名都使用占位符${sql_job.tableName}替换,如下:
    create_and_insert.sql
create table if not exists ${sql_job.tableName}(
  id int(10) not null,
  name varchar(16) not null,
  value double(20,3) not null
);

insert into ${sql_job.tableName} values
 (1,"test-1",1),
 (2,"test-2",2),
 (3,"test-3",3),
 (4,"test-4",4),
 (5,"test-5",5),
 (6,"test-6",6),
 (7,"test-7",7),
 (8,"test-8",8)
 ;

select count(*) from ${sql_job.tableName};

update_value.sql

update ${sql_job.tableName} set value = value*2;

select count(*) from ${sql_job.tableName};

  • 2.在quickstart_example.flow中增加占位符参数定义,如下:
config: 
  sql_job.database.type: mysql # 支持mysql、postgresql
  sql_job.database.driver: com.mysql.jdbc.Driver
  sql_job.database.host: localhost
  sql_job.database.port: 3306
  sql_job.database.database: demo
  sql_job.database.schema:  # postgresql建议必填
  sql_job.database.username: root
  sql_job.database.password: 123456
  sql_job.tableName: table_b #增加该占位符变量定义

nodes:
  - name: create_and_insert_job
    type: sql_job
    config:
      sql_job.scripts: quickstart_example/scripts/create_and_insert.sql # 脚本路径
  - name: update_value_job
    type: sql_job
    dependsOn:
    - create_and_insert_job
    config:
      sql_job.scripts: quickstart_example/scripts/update_value.sql # 脚本路径
  • 3.上传Azkaban执行同上面示例。

五、注意事项

  • sql解释执行使用ant.jar包,该包不支持"#"号格式的注释,只支持"--"或"//"格式注释。
  • 占位符参数名必须以"sql_job."作为前缀。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容