一、dask介绍
dask官网地址:https://dask.org/
优势:dask内部自动实现了分布式调度、无需用户自行编写复杂的调度逻辑和程序;通过调用简单的方法就可以进行分布式计算、并支持部分模型的并行化处理;内部实现的分布式算法:xgboost、LR、sklearn的部分方法等
用一句话说:dask就是python版本的spark,是一个用Python 语言实现的分布式计算框架
二、dask安装
1.环境
建议使用:Anaconda3工具包
系统:windows、linux
2.安装
1.conda安装:conda install dask
2.pip 安装:pip install dask
3.source安装:
git clone https://github.com/dask/dask.git
cd dask
python setup.py install
3.分布式版安装
1.conda安装:conda install dask distributed-cconda-forge
2.pip 安装:pip install dask distributed --upgrade
3.source安装:
git clone https://github.com/dask/distributed.git
cd distributed
python setup.py install
关于分布式版本安装的注意事项(针对macos)请参考官网:
https://distributed.dask.org/en/latest/install.html
三、dask集群搭建
1.启动主节点(类似注册中心)
本人实验环境:一台windows机器+3台虚拟化linux服务器,并4台机器均已按照上面步骤安装配置dask
选择Windows机器作为主节点,启动命令:
$ dask-scheduler
控制台显示信息如下:
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://192.168.1.42:8786
distributed.scheduler - INFO - :8787
distributed.scheduler - INFO - Local Directory: C:\Users\User\AppData\Local\Temp\scheduler-gd9uk980
distributed.scheduler - INFO - -----------------------------------------------
2.启动工作节点
在其他每台linux机器命令行输入:
$ dask-worker 192.168.1.42:8786
注意:后面跟的ip和端口是主节点的ip和对应服务的端口
工作节点启动成功后,此时主节点会显示多出信息:
distributed.scheduler - INFO - Register tcp://192.168.1.184:45772
distributed.scheduler - INFO - Starting worker compute stream, tcp://192.168.1.184:45772
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://192.168.1.183:43405
distributed.scheduler - INFO - Starting worker compute stream, tcp://192.168.1.183:43405
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://192.168.1.188:38095
distributed.scheduler - INFO - Starting worker compute stream, tcp://192.168.1.188:38095
distributed.core - INFO - Starting established connection
四、 dask集群使用
1.单机使用示例
"""单机dask"""
import time
from dask.distributed import Client
client = Client(asynchronous=True)
def square(x):
return x ** 2
def neg(x):
return -x
ts = time.time()
A = client.map(square, range(10000))
B = client.map(neg, A)
total = client.submit(sum, B)
print(total.result())
print('cost time :%s'%(time.time()-ts))
cost time :8.507587909698486
2.分布式版使用示例
"""分布式dask"""
import time
from dask.distributed import Client
client = Client('192.168.1.42:8786' ,asynchronous=True)
ts = time.time()
A = client.map(square, range(10000))
B = client.map(neg, A)
total = client.submit(sum, B)
print(total.result())
print('cost time :%s'%(time.time()-ts))
cost time :3.793848991394043
通过官网提供的测试例子可以看出dask的确体现了分布式的优势。
如果您觉得有帮助的话,可以扫码,赞赏鼓励一下!谢谢!