程序、进程、线程的相关概念
程序是含有指令和数据的文件,被存储在磁盘或其他的数据存储设备中,也就是说程序是静态的代码。
进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。
线程与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中可以产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程。
进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率
创建线程
函数式创建线程
# 进程管理软件,进程之间互相独立
# 线程是一个软件内部的任务。
import threading
import time
def run(n):
print('task', n)
time.sleep(2)
print("{}finished".format(n))
t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()
print('taskmain finished')
target为线程函数的名字,而args为其参数,需用括号括住,且最后一个参数后需跟逗号。
由于此输出为了显示t1与t2交替工作而较特殊,不在此展示。
类方式创建线程
class MyThread(threading.Thread):
def __init__(self, n):
super().__init__()
self.n = n
def run(self):
print("task", self.n)
time.sleep(2)
print("{}finished\n".format(self.n))
t1 = MyThread('t1')
t2 = MyThread('t2')
t1.start()
t2.start()
join()
join 调用者会等待该线程结束后,再执行
class MyThread(threading.Thread):
def __init__(self, n):
super().__init__()
self.n = n
def run(self):
print("task", self.n)
time.sleep(2)
print("{}finished\n".format(self.n))
t1 = MyThread('t1')
t2 = MyThread('t2')
t1.start()
t2.start()
t2.join()
t1.join()
print('main finished...')
输出可能如下
task t1
task t2
t2finished
t1finished
main finished...
若注释掉join语句,输出可能如下
task t1
task t2
main finished...
t2finished
t1finished
join可以帮助统计运行时间
class MyThread(threading.Thread):
def __init__(self, n):
super().__init__()
self.n = n
def run(self):
print("task", self.n)
time.sleep(2)
print("{}finished\n".format(self.n))
t1 = MyThread('t1')
t2 = MyThread('t2')
t = time.time()
t1.start()
t2.start()
t2.join()
t1.join()
t1 = time.time()
print('main finished...')
print(t1-t)
# 输出的时间差为2秒左右,若注释掉join,则会让时间差降到接近0。
锁
由于线程之间的资源共享,所以在调用时可能会出现读脏数据、不可重复读、丢失修改等问题。为了解决这个问题,可以在线程上加锁
举个例子:
让两个线程同时对一个全局变量进行加操作,每个加到100。
当不加锁时:
def work1(num):
global g_num
for i in range(num):
time.sleep(0.01)
g_num += 1
print('----in work1,g_num is %d---'% g_num)
def work2(num):
global g_num
for i in range(num):
time.sleep(0.01)
g_num += 1
print('----in work2,g_num is %d---' % g_num)
print('---线程创建之前g_num is %d'%g_num)
t1 = threading.Thread(target=work1, args=(100,))
t1.start()
t2 = threading.Thread(target=work2, args=(100,))
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print('两个线程操作之后的结果是 %s' % g_num)
此时输出如下
---线程创建之前g_num is 0
----in work1,g_num is 199---
----in work2,g_num is 200---
两个线程操作之后的结果是 200
我们发现这不是一个加到100,而另一个加到200,而是乱加,谁快谁加。而加上锁后:
g_num = 0
lock = threading.Lock()
# 加锁时,别的程序无法调用其中的数据
def work1(num):
global g_num
with lock:
for i in range(num):
time.sleep(0.01)
g_num += 1
print('----in work1,g_num is %d---'% g_num)
def work2(num):
global g_num
with lock:
for i in range(num):
time.sleep(0.01)
g_num += 1
print('----in work2,g_num is %d---' % g_num)
print('---线程创建之前g_num is %d'%g_num)
t1 = threading.Thread(target=work1, args=(100,))
t1.start()
t2 = threading.Thread(target=work2, args=(100,))
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print('两个线程操作之后的结果是 %s' % g_num)
输出如下
---线程创建之前g_num is 0
----in work1,g_num is 100---
----in work2,g_num is 200---
两个线程操作之后的结果是 200
这下是一个100,一个200了,加上锁后,全局变量g_num无法被给它加锁的调用者以外的线程使用。
线程之间的通信
线程之间的通信较为简单,主要介绍一下生产-消费者模式
import queue
q = queue.Queue(maxsize=10)
def producor():
for i in range(1000):
q.put(i)
def customer():
for i in range(1000):
data = q.get()
print(data)
t1 = threading.Thread(target=producor)
t2 = threading.Thread(target=customer)
t1.start()
t2.start()
输出如下
0
1
2
3
4
......
997
998
999
t1(producer)往队列中加数,而t2(customer)从同一个队列中抽数。
线程池
在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化的数量的线程,放到队列中,让过来的任务立刻能够使用,就形成了线程池。
import concurrent.futures as futures
#创建线程池是通过concurrent.futures函数库中的ThreadPoolExecutor类来实现的。
#future对象:在未来的某一时刻完成操作的对象 submit方法可以返回一个future对象.
# 没有线程的创建与销毁
def add(n1,n2):
v = n1 + n2
time.sleep(n1)
return v
ex = futures.ThreadPoolExecutor(max_workers = 3)
# max_worlers 最多同时运行的线程
f1 = ex.submit(add, 2, 3)
f2 = ex.submit(add, 2, 2)
print(f1.done())
print(f1.result())# 阻塞方法 返回f1执行结果
print('finished...')
线程池例子 下载图片
import requests
import os
import random
def download_img(url):
resp = requests.get(url) # 下载地址的内容
filename = os.path.split(url)[1] # 将路径和文件名分开
with open(filename, 'wb+') as f:
f.write(resp.content)
num = random.randint(2, 5)
print(filename+' time:', num)
time.sleep(num)
return filename
urls = ['http://pic31.nipic.com/20130801/11604791_100539834000_2.jpg',
'http://pic29.nipic.com/20130507/8952533_183922555000_2.jpg',
'http://pic33.nipic.com/20131007/13639685_123501617185_2.jpg']
ex = futures.ThreadPoolExecutor(max_workers=3)
# 返回顺序为序列顺序
res_iter = ex.map(download_img, urls)
for res in res_iter:
print(res)
fu_tasks = [ex.submit(download_img, url) for url in urls]
for future in futures.as_completed(fu_tasks):
# 谁先完成就返回谁,顺序与列表顺序不同
print(future.result())
fu_tasks = [ex.submit(download_img, url) for url in urls]
# wait,返回两个集合,第一个是已经完成的任务,第二个是未完成的任务。
rs = futures.wait(fu_tasks, return_when=futures.FIRST_COMPLETED)
print(rs)
# add_done_callback(fn)
def cf(rs):
print(rs)
for url in urls:
f = ex.submit(download_img, url)
f.add_done_callback(cf)