Airflow rest_api 插件部署

一.描述

A plugin for Apache Airflow that exposes REST endpoints for the Command Line Interfaces listed in the airflow documentation:

http://airflow.apache.org/cli.html

二.可选操作

1.可能需要安装flask_jwt_extended模块
2.如果开启tooken验证,用户的request请求头需要加入rest_api_plugin_http_token

Example :curl --header "rest_api_plugin_http_token: changeme" http://{HOST}:{PORT}/admin/rest_api/api?api=version
如果未携带会返回:

  "call_time": "{TIMESTAMP}",
  "http_response_code": 403,
  "output": "Token Authentication Failed",
  "response_time": "{TIMESTAMP}",
  "status": "ERROR"
}

Working with the rest_api_plugin and JWT Auth tokens

Pass the additional Authorization:Bearer <access_token> header in the rest API request

curl -H 'Authorization:Bearer <access_token>' 'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=version'  

If you have rest_api_plugin_http_token_header authentication enabled then you need to pass both headers as shown below.

curl -H 'Authorization:Bearer <access_token>'  -H 'rest_api_plugin_http_token: changeme' http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=version

If JWT Authorization header is missing in the request you will get the following response. (Status Code: 401)

{ "msg": "Missing Authorization Header" }

If JWT access token expires you will get the following response. (Status Code: 401)

{ "msg": "Token has expired" }

If JWT access token is invalid you will get the following response. (Status Code: 422)

{ "msg": "Invalid payload padding" }

Please refer Flask-JWT-Extended module documentation for more details

curl -XPOST http://localhost:8080/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"pdc_admin", "password":"pdc_deploy@jjmatch", "refresh":true, "provider": "db"}' 
curl -H 'Authorization:Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1NzczNDczMjksIm5iZiI6MTU3NzM0NzMyOSwianRpIjoiMjRmNTU0YjYtOWJjNS00YWU3LWEzYjYtZTM5N2QwMTNmOGRlIiwiZXhwIjoxNTc3MzQ4MjI5LCJpZGVudGl0eSI6MiwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.917nUhpl-dNCE8wgjhGXjEjSlN8jBDYkKzL2dV8NowQ'  -H 'rest_api_plugin_http_token: changeme' http://localhost:8080/admin/rest_api/api?api=version

三.部署步骤

1.新增plugins{AIRFLOW_HOME}/plugins

2.在airflow.cfg中追加plugins_folder = /home/{USER_NAME}/airflow/plugins

3.wget https://github.com/teamclairvoyant/airflow-rest-api-plugin/archive/v1.0.7.branch.zip

4.解压

unzip airflow-rest-api-plugin-v1.0.7.branch.zip
cp -r airflow-rest-api-plugin-v1.0.7.branch/plugins/* {AIRFLOW_HOME}/plugins/

5.(可选操作apitoken验证 log配置 避免过多日志信息).在airflow.cfg中追加内容

 [rest_api_plugin]
 
 # Logs global variables used in the REST API plugin when the plugin is loaded. Set to False by default to avoid too many logging messages.
 # DEFAULT: False
 log_loading = False
 
 # Filters out loading messages from the standard out 
 # DEFAULT: True
 filter_loading_messages_in_cli_response = True
 
 # HTTP Header Name to be used for authenticating REST calls for the REST API Plugin
 # DEFAULT: 'rest_api_plugin_http_token'
 #rest_api_plugin_http_token_header_name = rest_api_plugin_http_token
    
 # HTTP Token  to be used for authenticating REST calls for the REST API Plugin
 # DEFAULT: None
 # Comment this out to disable Authentication
 #rest_api_plugin_expected_http_token = changeme

6.重启Airflow Web Server

7.如果安装成功将会在Admin Tap 下面看到 REST API 的页面用来测试rest请求

四.Airflow rest_api 插件使用

1.获取airflow版本: http://{HOST}:{PORT}/admin/rest_api/api?api=version

2.获取rest_api插件版本: http://{HOST}:{PORT}/admin/rest_api/api?api=rest_api_plugin_version

3.渲染任务实例模板: 
http://{HOST}:{PORT}/admin/rest_api/api?api=render
http://{HOST}:{PORT}/admin/rest_api/api?api=render&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05&subdir=value

4.CRUD操作变量 (目前不明白能做什么事情) 类似缓存 Context可以在自定义airflow脚本中使用 也可以在jinjia模板中使用
但它是专门用于任务之间的通信而不是全局设置

note: 可将json格式的文件 导入 系统自动识变量 可序列化json

http://{HOST}:{PORT}/admin/rest_api/api?api=variables

http://{HOST}:{PORT}/admin/rest_api/api?api=variables&cmd=set&key=value&value=value&get=value&json&default=value&import=value&export=value&delete=value

For setting a variable like myVar1=myValue1 use

http://{HOST}:{PORT}/admin/rest_api/api?api=variables&cmd=set&key=myVar1&value=myValue1

使用方法:
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)

echo {{ var.value.<variable_name> }}

5.操作外部connections:http://{HOST}:{PORT}/admin/rest_api/api?api=connections

6.暂停DAG:http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id

7.停止暂停DAG: http://{HOST}:{PORT}/admin/rest_api/api?api=unpause&dag_id=test_id

8.获取Dag中任务失败后 所以依赖的task :
http://{HOST}:{PORT}/admin/rest_api/api?api=task_failed_deps&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05

9.触发Dag运行: 
http://{HOST}:{PORT}/admin/rest_api/api?api=trigger_dag&dag_id=test_id
http://{HOST}:{PORT}/admin/rest_api/api?api=trigger_dag&dag_id=test_id&run_id=run_id_2016_01_01&conf=%7B%22key%22%3A%22value%22%7D

10.测试Dag:http://{HOST}:{PORT}/admin/rest_api/api?api=test&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05

11.获取Dag执行状态:http://{HOST}:{PORT}/admin/rest_api/api?api=dag_state&dag_id=test_id&execution_date=2017-01-02T03:04:05

12.运行单个task:
http://{HOST}:{PORT}/admin/rest_api/api?api=run&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05

13.展示一个Dag的所有Task:http://{HOST}:{PORT}/admin/rest_api/api?api=list_tasks&dag_id=test_id

14.列出所有Dags : http://{HOST}:{PORT}/admin/rest_api/api?api=list_dags

15.获取Task运行状态 : 
http://{HOST}:{PORT}/admin/rest_api/api?api=task_state&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05

16.CRUD对pool操作:

 http://{HOST}:{PORT}/admin/rest_api/api?api=pool 类似调度池 可以分配任务权重

17.部署新的dag:

curl -X POST -H 'Content-Type: multipart/form-data' -F 'dag_file=@/path/to/dag.py' -F 'force=on' http://{HOST}:{PORT}/admin/rest_api/api?api=deploy_dag

post参数:
1)dag_file:file-上传部署的py文件
2)force:可选,布尔型,文件已存在,是否强制上传
3)pause:可选,布尔型,在创建时强制暂停dag并覆盖'dags_are_paused_at_creation'配置
4)unpause(可选) - 布尔值 -  DAG在创建时将被强制取消暂停,并覆盖'dags_are_paused_at_creation'配置

18.刷新Dag:

http://{HOST}:{PORT}/admin/rest_api/api?api=refresh_dag




Api 响应参数

API Response
The API's will all return a common response object. It is a JSON object with the following entries in it:

airflow_cmd - String - Airflow CLI command being ran on the local machine
arguments - Dict - Dictionary with the arguments you passed in and their values
post_arguments - Dict - Dictionary with the post body arguments you passed in and their values
call_time - Timestamp - Time in which the request was received by the server
output - String - Text output from calling the CLI function
response_time - Timestamp - Time in which the response was sent back by the server
status - String - Response Status of the call. (possible values: OK, ERROR)
warning - String - A Warning message that's sent back from the API
http_response_code - Integer - HTTP Response code
Sample (Result of calling the versions endpoint)

{
  "airflow_cmd": "airflow version",
  "arguments": {},
  "call_time": "Tue, 29 Nov 2016 14:22:26 GMT",
  "http_response_code": 200,
  "output": "1.7.0",
  "response_time": "Tue, 29 Nov 2016 14:27:59 GMT",
  "status": "OK"
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354

推荐阅读更多精彩内容