扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
Airflow 是一个 Airbnb 的 Workflow 开源项目,在Github 上已经有超过两千星。Airflow 使用 Python 写的,支持 Python 2/3 两个版本。 传统 Workflow 通常使用 Text Files (json, xml / etc) 来定义 DAG, 然后 Scheduler 解析这些 DAG 文件形成具体的 Task Object 执行;Airflow 没这么干,它直接用 Python 写 DAG definition, 一下子突破了文本文件表达能力的局限,定义 DAG 变得简单。 另外,Airflow 的权限设计、限流设计、以及 Hook/Plugin 的设计都挺有意思,功能性、扩展性良好。当然,项目里的代码质量感觉比较一般,很多地方函数名和实现不太一致,造成理解障碍;也有很多 Flag 和重复出现的定义,显然是当初没有设计好、后面没有精力 Refactor 转而 Hack 的结果。但总体上,可读性中上,系统的扩展性非常好。
创新互联建站专注为客户提供全方位的互联网综合服务,包含不限于网站设计、网站建设、金寨网络推广、微信小程序定制开发、金寨网络营销、金寨企业策划、金寨品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;创新互联建站为所有大学生创业者提供金寨建站搭建服务,24小时服务热线:13518219792,官方网址:www.cdcxhl.com
安装依赖程序 ;
[root@node01 ~]# yum -y install zlib zlib-devel bzip2 bzip2-devel ncurses ncurses-devel readline readline-devel openssl openssl-devel openssl-static xz lzma xz-devel sqlite sqlite-devel gdbm gdbm-devel tk tk-devel gcc
下载python ;
可以前往 https://www.python.org/ftp/python/查看Python各个版本,这里,我们选择安装Python-3.6.5.tgz版本。通过如下命令下载Python源码压缩包:[root@node01 ~]# wget https://www.python.org/ftp/python/3.6.5/Python-3.6.5.tgz
解压Python源码压缩包 ;
[root@node01 ~]# tar -zxvf Python-3.6.5.tgz [root@node01 ~]# cd Python-3.6.5
安装python ;
[root@node01 Python-3.6.5]# ./configure prefix=/usr/local/python3 [root@node01 Python-3.6.5]# make [root@node01 Python-3.6.5]# make install [root@node01 Python-3.6.5]# mv /usr/bin/python /usr/bin/python2 #将原来的执行脚本改名备份 [root@node01 Python-3.6.5]# ln -s /usr/local/python3/bin/python3 /usr/bin/python #将新的python执行脚本链接到命令路径内 [root@node01 Python-3.6.5]# ln -s /usr/local/python3/bin/pip3 /usr/bin/pip #将pip3链接到命令路径内
查看版本信息 ;
- 修改yum相关配置文件 ;
将以下两个文件的开头 #!/usr/bin/python 改成 #!/usr/bin/python2.7[root@node01 Python-3.6.5]# vim /usr/bin/yum [root@node01 Python-3.6.5]# vim /usr/libexec/urlgrabber-ext-down
默认python 3.6.5 已经自带pip3 ,不需要再额外进行安装。
如果是python2.x 就需要进行安装pip 。
解压MySQL软件(文件大小六百多兆,官网直接下载免安装)
[root@node01 ~]# tar zxvf mysql-5.7.28-linux-glibc2.12-x86_64.tar.gz [root@node01 ~]# mv mysql-5.7.28-linux-glibc2.12-x86_64 /opt/mysql [root@node01 ~]# cd /opt/mysql/
创建MySQL运行需要的用户跟组
[root@node01 mysql]# groupadd mysql [root@node01 mysql]# useradd -M -s /sbin/nologin -g mysql mysql
创建存放数据的目录
[root@node01 mysql]# mkdir data [root@node01 mysql]# chown -R mysql:mysql *
编写配置文件
vim /etc/my.cnf[client] port = 3306 socket = /opt/mysql/data/mysql.sock [mysqld] port = 3306 socket = /opt/mysql/data/mysql.sock basedir = /opt/mysql datadir = /opt/mysql/data user = mysql bind-address = 0.0.0.0 server-id = 1 init-connect = 'SET NAMES utf8' character-set-server = utf8
初始化数据库
[root@node01 mysql]# bin/mysqld --initialize --basedir=/opt/mysql --datadir=/opt/mysql/data --user=mysql
将数据库添加到服务内
[root@node01 mysql]# cp support-files/mysql.server /etc/init.d/mysqld [root@node01 mysql]# cp /opt/mysql/bin/* /usr/sbin/ [root@node01 mysql]# chmod +x /etc/init.d/mysqld
- 添加开机自启
[root@node01 mysql]# chkconfig --add mysqld [root@node01 mysql]# service mysqld start
登陆 MySQL, 密码在之前初始化时有提示的
[root@mysql_node01 mysql]# mysql -uroot -p
修改root密码,否则什么都做不了
mysql> alter user root@"localhost" identified by "123456"; Query OK, 0 rows affected (0.00 sec)
- 测试账号是否可以增删
mysql> create database abctest; Query OK, 1 row affected (0.00 sec)
mysql> use abctest;
Database changed
mysql> drop database abctest;
Query OK, 0 rows affected (0.00 sec)
> 12. 授权airflow用户可被访问 ;
```bash
mysql> grant all on *.* to 'airflow'@'%' identified by '123456a';
Query OK, 0 rows affected, 1 warning (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
创建airflow 要用的数据库,库名 airflow ;
mysql> create database airflow ; Query OK, 1 row affected (0.00 sec)
- 开启timestamp ,数据更新时添加上当前时间 ;
mysql> set @@global.explicit_defaults_for_timestamp=on; Query OK, 0 rows affected (0.00 sec)
解压redis (如需下载请直接到官网进行下载);
[root@node01 ~]# tar zxvf redis-5.0.0.tar.gz [root@node01 ~]# cd redis-5.0.0/
安装redis ;
[root@node01 redis-5.0.0]# make && make test [root@node01 redis-5.0.0]# mkdir /usr/local/redis #创建redis运行的目录 [root@node01 redis-5.0.0]# cp redis.conf /usr/local/redis/ #复制配置文件到redis运行目录 [root@node01 redis-5.0.0]# cp src/redis-server /usr/sbin/ #复制redis-server程序 [root@node01 redis-5.0.0]# cp src/redis-cli /usr/sbin/ #复制redis-cli程序 [root@node01 redis-5.0.0]# cp src/redis-sentinel /usr/sbin/ #复制redis-sentinel 程序
编辑redis 配置文件
[root@node01 redis-5.0.0]# vim /usr/local/redis/redis.conf bind 0.0.0.0 #对外的绑定地址 daemonize yes #守护进程 pidfile /usr/local/redis/redis_6379.pid #指定pid logfile "/usr/local/redis/redis.log" #指定log dir "/usr/local/redis" #指定家目录 appendonly yes #开启持久化
启动redis ;
[root@node01 redis-5.0.0]# redis-server /usr/local/redis/redis.conf
- 连接redis 进行测试
[root@node01 ~]# redis-cli 127.0.0.1:6379> keys * (empty list or set)
安装rabbitmq跟erlang (程序直接到官网下载即可);
[root@node01 ~]# yum localinstall erlang-20.3-1.el7.centos.x86_64.rpm rabbitmq-server-3.7.5-1.el7.noarch.rpm -y
拷贝配置文件模板,然后修改配置文件(开启默认登录用户);
[root@node01 ~]# cp /usr/share/doc/rabbitmq-server-3.7.5/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config [root@node01 ~]# vim /etc/rabbitmq/rabbitmq.config
- 开启管理插件,启动程序 ;
[root@node01 ~]# rabbitmq-plugins enable rabbitmq_management #开启管理插件 [root@node01 ~]# chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie #设置cookie 文件权限(否则无法启动) [root@node01 ~]# systemctl start rabbitmq-server.service #启动rabbitmq [root@node01 ~]# systemctl enable rabbitmq-server.service #添加开机自启
打开管理页面 http://ip:15672 ,使用默认用户guest ,密码guest 进入 ;
- 点击admin -->> add a user ,添加用户密码并设置administrator 权限 ;
- 为admin用户添加 虚拟目录 / ;
单击用户admin
选择对应的Virtual Host ,单击 Set permission 进行设置 ;
重新查看,发现已经设置成功 ;
将所有守护进程运行在同一台机器上即可完成 airflow 的单结点部署,架构如下图所示
- 安装依赖( airflow[mysql] 需要用到 );
[root@node01 ~]# yum install mariadb-devel MySQL-python -y [root@node01 ~]# mkdir /usr/local/mysql #创建目录
[root@node01 ~]# find / -name mysql.h #查看mysql.h 发现有两个,因为node01有安装MySQL,所以有两个文件,在安装airflow[mysql]时会调用/usr/local/mysql/include/mysql.h 文件
/usr/include/mysql/mysql.h
/opt/mysql/include/mysql.h
[root@node01 ~]# ln -s /opt/mysql /usr/local/mysql # 将MySQL安装目录链接到/usr/local/mysql
[root@node01 airflow]# ln -s /opt/mysql/lib/libmysqlclient.so.20 /usr/lib64/libmysqlclient.so.20
> 2. 添加环境变量
vim /etc/profile
export PATH=$PATH:/usr/local/python3/bin/
export AIRFLOW_HOME=~/airflow
> 3. 安装airflow 及相关应用 ;
[root@node01 ~]# pip install --upgrade pip #更新pip
[root@node01 ~]# pip install --upgrade setuptools #更新setuptools
[root@node01 ~]# pip install apache-airflow #安装airflow 主程序
[root@node01 ~]# pip install apache-airflow[mysql] #安装mysql连接器
[root@node01 ~]# pip install apache-airflow[celery] #安装celery 连接器
[root@node01 ~]# pip install redis #安装redis连接器
[root@node01 ~]# pip install --upgrade kombu #更新kombu (celery 需要最新的版本)
> 4. 初始化airflow 数据库 ;
[root@node01 ~]# airflow initdb #或者可以只使用airflow 命令加载配置文件
> 5. 初始化完之后会产生airflow相关的一些文件,查看airflow目录的内容 ;
![在这里插入图片描述](/upload/otherpic50/20191227093123311.png)
> 6. 修改配置文件 ;
vim /root/airflow/airflow.cfg
时区设置
default_timezone = Asia/Shanghai
不加载案例
load_examples = False
执行webserver默认启动端口
web_server_port = 8080
使用的执行器
executor = CeleryExecutor
设置消息的中间代理
broker_url = redis://192.168.255.16:6379/0
设定结果存储后端 backend
当然也可以使用 Redis :result_backend =redis://redis:Kingsoftcom_123@172.19.131.108:6379/1
result_backend = db+mysql://airflow:123456a@192.168.255.16:3306/airflow
数据库连接
sql_alchemy_conn = mysql://airflow:123456a@192.168.255.16:3306/airflow
> 7. 开启timestamp ,数据更新时添加上当前时间 ;
mysql -uairflow -p
mysql> set @@global.explicit_defaults_for_timestamp=on;
Query OK, 0 rows affected (0.00 sec)
> 8. 重新生成数据库文件 ;
airflow initdb
> 9. 启动
[root@node01 airflow]# airflow webserver -p 8080 -D
[root@node01 airflow]# airflow scheduler -D
[root@node01 airflow]# airflow flower -D
[root@node01 airflow]# airflow worker -D #默认启动会报错,因为worker不允许被ROOT用户启动
> 10. 设置worker 可以被root用户启动
> 在 /etc/profile 文件内添加 export C_FORCE_ROOT=True 内容 ,source /etc/profile 重新加载即可
> 11. 查看单节点是否已经搭建好
> 分别打开http://192.168.255.11:8080 跟 http://192.168.255.11:5050 查看是否ok
![在这里插入图片描述](/upload/otherpic50/20191227093149183.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2dxajU0MDgyMTYx,size_16,color_FFFFFF,t_70)
![在这里插入图片描述](/upload/otherpic50/20191227093156143.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2dxajU0MDgyMTYx,size_16,color_FFFFFF,t_70)
### airflow 多节点(集群)部署
> 在稳定性要求较高的场景,如金融交易系统中,一般采用集群、高可用的方式来部署。Apache Airflow 同样支持集群、高可用的部署,airflow 的守护进程可分布在多台机器上运行,架构如下图所示:
#### 架构图
![在这里插入图片描述](/upload/otherpic50/20191227093258334.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2dxajU0MDgyMTYx,size_16,color_FFFFFF,t_70)
##### 多节点好处
> 高可用
> 如果一个 worker 节点崩溃或离线时,集群仍可以被控制的,其他 worker 节点的任务仍会被执行。
> 分布式
> 处理如果你的工作流中有一些内存密集型的任务,任务最好是分布在多台机器上运行以便得到更快的执行。
#### 扩展 worker 节点
##### 水平扩展
> 你可以通过向集群中添加更多 worker 节点来水平地扩展集群,并使这些新节点指向同一个元数据库,从而分发处理过程。由于 worker 不需要在任何守护进程注册即可执行任务,因此所以 worker 节点可以在不停机,不重启服务下的情况进行扩展,也就是说可以随时扩展。
##### 垂直扩展
> 你可以通过增加单个 worker 节点的守护进程数来垂直扩展集群。可以通过修改 airflow 的配置文件-{AIRFLOW_HOME}/airflow.cfg 中 celeryd_concurrency 的值来实现,例如:
celeryd_concurrency = 30
> 您可以根据实际情况,如集群上运行的任务性质,CPU 的内核数量等,增加并发进程的数量以满足实际需求。
#### 扩展 Master 节点(高可用)
> 您还可以向集群中添加更多主节点,以扩展主节点上运行的服务。您可以扩展 webserver 守护进程,以防止太多的 HTTP 请求出现在一台机器上,或者您想为 webserver 的服务提供更高的可用性。需要注意的一点是,每次只能运行一个 scheduler 守护进程。如果您有多个 scheduler 运行,那么就有可能一个任务被执行多次。这可能会导致您的工作流因重复运行而出现一些问题。
> 下图为扩展 Master 节点的架构图:
![在这里插入图片描述](/upload/otherpic50/20191227093318702.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2dxajU0MDgyMTYx,size_16,color_FFFFFF,t_70)
> 看到这里,可能有人会问,scheduler 不能同时运行两个,那么运行 scheduler 的节点一旦出了问题,任务不就完全不运行了吗?
>
> 答案: 这是个非常好的问题,不过已经有解决方案了,我们可以在两台机器上部署 scheduler ,只运行一台机器上的 scheduler 守护进程 ,一旦运行 scheduler 守护进程的机器出现故障,立刻启动另一台机器上的 scheduler 即可。我们可以借助第三方组件 airflow-scheduler-failover-controller 实现 scheduler 的高可用。
>
> 两台master 都安装并运行 failover ,这样才能实现真正的高可用
> 具体步骤如下所示:
> 1. 下载 failover
git clone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
> 2. 使用 pip 进行安装
cd airflow-scheduler-failover-controller
pip install -e .
> 3. 初始化 failover
scheduler_failover_controller init
> 注:初始化时,会向airflow.cfg中追加内容,因此需要先安装 airflow 并初始化。
> 4. 更改 failover 配置
scheduler_nodes_in_cluster= host1,host2
> 注:host name 可以通过scheduler_failover_controller get_current_host命令获得
> 5. 配置安装 failover 的机器之间的免密登录,配置完成后,可以使用如下命令进行验证:
scheduler_failover_controller test_connection
> 6. 启动 failover
scheduler_failover_controller start
> 注意:在failover 监控scheduler 时会出现无法启动的情况,此时应该修改 /root/airflow/airflow.cfg 的
airflow_scheduler_start_command = export AIRFLOW_HOME=/root/airflow;;nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &
改为 airflow_scheduler_start_command = airflow scheduler -D
> 因此更健壮的架构图如下所示:
![在这里插入图片描述](/upload/otherpic50/20191227093337590.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2dxajU0MDgyMTYx,size_16,color_FFFFFF,t_70)
##### 队列服务及元数据库(Metestore)的高可用。
> 队列服务取决于使用的消息队列是否可以高用可部署,如 RabbitMQ 和 Redis。
> RabbitMQ 集群并配置Mirrored模式见:http://blog.csdn.net/u010353408/article/details/77964190
> 元数据库(Metestore) 取决于所使用的数据库,如 Mysql 等。
> Mysql 做主从备份见:http://blog.csdn.net/u010353408/article/details/77964157
#### airflow 集群部署的具体步骤
##### 前提条件
> 节点运行的守护进程如下:
* master1 运行: webserver, scheduler
* master2运行:webserver
* worker1运行:worker
* worker2运行:worker
> 队列服务处于运行中. (RabbitMQ, Redis, etc)
* 安装 RabbitMQ 方法参见: http://site.clairvoyantsoft.com/installing-rabbitmq/
* 如果正在使用 RabbitMQ, 推荐 RabbitMQ 也做成高可用的集群部署,并为 RabbitMQ 实例配置负载均衡。
##### 步骤
> 1、 在所有需要运行守护进程的机器上安装 Apache Airflow。
> 2、修改 {AIRFLOW_HOME}/airflow.cfg 文件,确保所有机器使用同一份配置文件。
> 1. 修改 Executor 为 CeleryExecutor
executor = CeleryExecutor
> 2. 指定元数据库(metestore)
sql_alchemy_conn = mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
> 3. 设置中间人(broker)
> 如果使用 RabbitMQ
broker_url = amqp://guest:guest@{RABBITMQ_HOST}:5672/
> 如果使用 Redis
broker_url = redis://{REDIS_HOST}:6379/0 #使用数据库 0
> 4. 设定结果存储后端 backend
celery_result_backend = db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow
#当然您也可以使用 Redis :celery_result_backend =redis://{REDIS_HOST}:6379/1
> 3、在 master1 和 master2 上部署您的工作流(DAGs)。
> 4、在 master 1,初始 airflow 的元数据库
$ airflow initdb
> 5、在 master1, 启动相应的守护进程
$ airflow webserver
$ airflow scheduler
> 6、在 master2,启动 Web Server
$ airflow webserver
> 7、在 worker1 和 worker2 启动 worker
$ airflow worker
> 8、使用负载均衡处理 webserver
> 可以使用 nginx,AWS 等服务器处理 webserver 的负载均衡,所有均已集群或高可用部署,apache-airflow 系统已坚不可摧。
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流