1.airflow安装
pip install airflow
pip install airflow[celery,mysql,password]
*注:centos在安装pip install [mysql]之前 执行 yum install -y mysql-devel python-devel python-setuptools
2.安装mysql5.6
选择安装的路径: /home/hadoop/install/mysql5.6
1.下载 https://dev.mysql.com/downloads/mysql/ 选择源码
2.解压 tar xvfz mysql-5.6.17.tar.gz
3.编译 cmake -D MYSQL_DATADIR=/home/hadoop/install/mysql5.6/data -D SYSCONFDIR=/home/hadoop/install/mysql5.6/etc -D CMAKE_INSTALL_PREFIX=/home/hadoop/install/mysql5.6 .
make
make install
4.配置mysql
cd /home/hadoop/install/mysql5.6
cp support-files/my-default.cnf ./
修改my-default.cnf 如下:
scripts/mysql_install_db --defaults-file=./my-default.cnf #执行完,在该路径下会产生my.cnf
/bin/sh bin/mysqld_safe --defaults-file=./my.cnf &
注: * character_set_server=latin1 是必须的,如果是utf8的话,在airflow运行过程中偶尔会出现 "Invalid utf8 character string: '80027D'" 的异常。
* export LD_LIBRARY_PATH=/home/hadoop/install/mysql5.6/lib 加到~/.bashrc 中告诉airflow时使用的sock,否则会使用系统默认的mysql.sock
创建airflow数据库,以及用户
mysql -uroot #mysql 无秘登录
CREATE DATABASE airflow;
GRANT all privileges on airflow.* TO 'airflow'@'localhost' IDENTIFIEDBY 'airflow'; #创建数据库airflow ,用户airflow ,密码airflow
FLUSH PRIVILEGES;
3.配置airflow
airflow initdb #执行完可以查看airflow 数据库中是否已经生成相关的表
cd ~/airflow #安装完airflow之后,会默认在~下生成airflow目录
vim airflow.cfg #配置相关配置
具体参考 线上部署配置
* initdb中出现 fernet key .... raise TypeError(msg) TypeError: Incorrect padding 。解决办法:
pip install cryptography
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
Add the generated key to the config file airflow.cfg, fernet_key = YOUR_GENERATED_KEY
* auth_backend = airflow.contrib.auth.backends.password_auth #1.8.1版本中cfg文件没有写这个参数,一定要加上,不然会报"airflow.exceptions.AirflowException: Failed to import authentication backend"错误
重要的说明如下:
1.airflow_home=/home/hadoop/airflow #airflow的home路径,就是自动生成的~/airflow.这个在发警报的时候,点击查看log很有用。
2.dags_folder #dag所在的目录
3.base_log_folder #dag运行过程中的存的log目录
4.executor = CeleryExecutor #我们使用 CeleryExecutor进行调度
5.sql_alchemy_conn = mysql://airflow:airflow@localhost:23306/airflow #部署sql连接的sqlalchemy
6.parallelism=32 #最大的并行数目
7.dag_concurrency = 32 #scheduler 最大的运行的dag数目
8.plugins_folder #plugins所在的目录,新生成一个,赋值给它就行
9.celeryd_concurrency=32 #CeleryExecutor的起的线程的数目
4 新增用户
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'XXX'
user.email = 'YYY'
user._password = 'ZZZ'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
5.重启shell
ps -ef |grep 'celeryd' |awk '{print $2}'|xargs kill -9
ps -ef |grep 'airflow webserver' |awk '{print $2}' |xargs kill -9
ps -ef |grep 'airflow-webserver' |awk '{print $2}' |xargs kill -9
ps -ef |grep 'airflow scheduler' |awk '{print $2}' |xargs kill -9
nohup airflow worker >worker.log 2>&1 &
nohup airflow scheduler >scheduler.log 2>&1 &
nohup airflow webserver >webserver.log 2>&1 &
6.开发自己的Operator
7.开发自己的Sensor
http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/