人生苦短,我用Python!
情景描述:
当我们在处理数据的时候,尤其对大量的文本数据或是大量的独立的数据处理的时候,如果是用单线程的方式处理的话可能会耗费大量的时间,大量到你会哭的那种。比如我现在有2000万条文本数据,我现在想将其按照正则抽取不同的规则模式,并使用分词软件分词,然后调用模型处理。在python编程的条件下,当你readlines以后一条一条处理的话,我按照之前的实验估算的话可能需要一天多,这个时候等不了果断kill掉进程,然后使用多线程的方式来进行处理。比如我们单位的服务器有48个核心。所以理论上能将速度提升48倍,也就是在半个小时就能完成处理,吃个饭就能得出结果。这部分的解决办法和示例代码已经在并行处理给出了。在这个实例中我们可以控制使用的CPU核数,不会对服务器造成过大的压力,归根结底还是要控制后台同时运行的线程数目。我之前遇到过一个项目,需要后台运行差不多1000个线程,这样服务器肯定受不了,所以我会将1000个线程分开写到5-10个bash文件中,分别调用。每次还得自己去手动的开始,所以在这里利用两个命令来对每一个后台线程分配到一个具体的cpu,这样就能保证资源的可控。
思路和用到的语言:
方案一:每一个shell命令,使用linux的fgrep的命令和taskset分别为每一个后台运行的进程指定运行的CPU,由于Linux内核在任务调度的时候需要考虑到进程的优先级和空闲的CPU,实际在执行的时候是在不同的核上分时进行的。而taskset则是可以直接指定运行的CPU。
遇到的问题,当后台需要运行的命令少的时候可以,但是当你一个非管理员用户在同时大量fork线程的时候,系统会限制用户最大能开启的线程,并且这个时候好多的线程挤在同一个CPU上,造成了线程的挂起。所以这个方案适合使用在需要同时执行的线程数不是特别多的时候。我实际的测试在线程数超过2000的时候,会出现资源限制。
python script_1.py param1 param2 .... &
taskset -cp 0-max_cpus $!
python script_2.py param1 param2 ... &
taskset -cp 0-max_cpus $!
... 中间是同样的调用shell脚本 ...
python script_n.py param1 param2 ... &
taskset -cp 0-max_cpus $!
上面的脚本文件展示了用方案一的时候,在这里每一个python调用的shell命令可以是相同的,也可以是不同的。后面的跟的参数可以看作是一组实验的参数设置,最末尾的&
符号表示将这个命令挂到后台执行。第二行的taskset是linux将特定的程序放到某个/某些CPU上的命令。当Linux运行程序的时候,会为当前的程序分配一个pid,就跟给线程一个身份证号一样。可以使用taskset -p pid
来看pid对应的程序分配到了哪个CPU上,一般是分配到全部的CPU核上然后操作系统在后台调度。使用taskset -cp 0 $!
就能将线程分配到CPU的第一个核上。在这个命令中可以指定特定的CPU,也可以使用逗号分隔的CPU列表,或是中横线表示的一个范围。后面的$!
表示的是上一个线程的pid。
taskset -pc 0,1,2,5-10 $!
这个命令表示的是将上一个程序分配到{0,1,2,5,6,7,8,9,10}这些CPU上进行自由调度,这样就能限制CPU的使用。
方案二:使用上一个方法的时候,当线程比较多,系统的资源会出现暂时无法使用的情况并且后台一下挂起很多的线程,对系统的调度可能也不是很好,这里就使用通过监测当前运行脚本的数目来特定分配,实现同一个时刻只有一个后台命令运行在特定的CPU核上。
import sys
import os, subprocess
import time
from collections import deque
import constant
def getThreadsNum(thread_name):
thread = subprocess.Popen("ps -aux | grep {thread_name}".format(
thread_name=thread_name
), shell=True, stdout=subprocess.PIPE)
out = thread.stdout.readlines()
return len(out) - 1
def runShell(cmd, cpu_index):
"""将shell命令挂到后台执行,并指定其CPU"""
# 这里一定要注意,
cmd_query = cmd.split(">")[0]
cmd_query = " ".join([item for item in cmd_query.split(" ") if item])
# 运行shell
subprocess.run(cmd, shell=True)
# 查找其pid
p = subprocess.Popen('pgrep -f "{name}"'.format(
name=cmd_query,
), shell=True, stdout=subprocess.PIPE)
out = p.stdout.readlines()
# 这里面需要选择pid小的那个
pid = min([int(item) for item in out])
# 这里stdout让程序自己来输出,可以监测进度
subprocess.run("taskset -cp {cpu_index} {pid}".format(
pid=pid,
cpu_index=cpu_index
), shell=True)
def runAll(sh_file):
"""运行全部"""
with open(sh_file) as f:
lines = [line.strip() for line in f.readlines() if line[0] != '#']
quene = deque(lines)
cpu_index = 0
while len(quene) != 0:
thread_num = getThreadsNum("predict_trend")
# 将剩余的cpu填满线程
for i in range(0, constant.MAX_CPU - thread_num):
# 到最后可能就只剩几个没有执行的了,所以这里需要实时的判断队列是不是为空
if len(quene) == 0:
break
cmd = quene.popleft()
runShell(cmd, cpu_index)
# cpu_index 的选择
cpu_index += 1
if cpu_index == constant.MAX_CPU:
cpu_index = 0
time.sleep(1)
print("全部执行完毕!")
if __name__ == "__main__":
params = sys.argv
shell_path = params[1]
runAll(shell_path)
上面的pyhton代码就是最终的解决方案,在runAll函数中,通过将所有的shell脚本中命令放到一个队列中(当然也可以不用队列),然后通过监测当前的系统中运行的脚本的数目,得出空闲的CPU核数,然后将这些核填满,相信大家自己看代码就会明白了。最后,最重要的一点就是,使用pgrep -f
这个命令的时候,后面的name需要将多个空格合并成一个空格,一般会返回两个pid,一个是我们后台运行的程序,一个就是这个命令本身也包含了,我在操作的时候,由于没有注意到多个空格的问题,得到的总是pgrep
的pid,造成将它分配到了CPU核上,而没有真正的将cmd分配,其他的地方就没有什么难处了。
当然这只是一个简单的原型,可以从我运行的图中看出来,服务器有48个核,我在这里设定的是使用其中的前35个核干活,这样剩余的核可以用来干别的事情,并且当taskset成功执行的时候,都会进行输出。
总结
在处理数据的时候,要么你去学mapreduce技术,要么就一定要掌握linux的一些命令,比如awk,join, grep, split等命令和管道技术,然后配合python的灵活性,就会有你想不到的特殊效果。