1. 内存管理机制
1.1 赋值语句的内存分析
- 赋值语句都是“引用”,可以这样理解,但是,这种“引用”是可以改变指向的
- 非赋值语句时
- 对于可变类型的数据, 创建一块新内存(Set、Dictionary、List)
- 对于不可变数据类型
简单数据,存在使用“引用”,不存在创建新内存
复杂大数据,创建新内存
1.2 垃圾回收机制
1.2.1 垃圾自动回收
-
以引用计数为主,分代收集为辅
# 引用计数 # 1.每个对象都有存有指向该对象的引用总数 # 2.查看某个对象的引用计数 --> sys.getrefcount(obj) # 3.可以使用del关键字删除某个引用
# 分代回收 # python将所有的对象分为0,1,2三代 # 所有的新建对象都是0代对象 # 当某一代对象经历垃圾回收,依然存活,那么它就被归入下一代对象
如果一个对象的引用数为0,python虚拟机就会回收这个对象的内存
引用计数的缺陷是循环引用问题
-
垃圾自动回收总结
# 垃圾自动回收 # 满足特定条件,自动启动垃圾回收 # 当python运行时,会记录其中分配对象(object allocation)和取消分配对象(object deallocation)的次数 # 当两者的差值高于某个阀值时,垃圾回收才会启动 # 查看阀值gc.get_threshold()
1.2.2 垃圾手动回收
# 垃圾手动回收
# gc.collect()手动回收
# objgraph模块中的count(obj)记录当前类产生的实例对象的个数
1.3 内存管理机制
1.3.1 内存池机制
当创建大量消耗小内存的对象时,频繁调用new/malloc会导致大量的内存碎片,致使效率降低。内存池的概念就是在内存中申请一定数量的,大小相等的内存块留作备用,当有新的内存需求时,就先从内存池中分配内存给这个需求,不够了之后再申请新的内存。这样做最显著的优势就是能够减少内存碎片,提升效率
1.3.2 Python3内存管理机制——Pymalloc
- 针对小对象(<=512bytes),pymalloc会在内存池中申请内存空间
- 当>512bytes,则会PyMem_RawMalloc和PyMem_RawRealloc()来申请新的内存空间
2. Python多线程
2.1 进程、线程、协程介绍
2.1.1 关系介绍
- 都运行在操作系统
- 一个应用中至少一个进程
- 一个进程中至少一个线程
- 协程在一个进程或者一个线程中执行
2.1.2 进程介绍
- 进程是一个执行中的程序
- 每个进程都拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据
- 操作系统管理其上所有进程的执行,并为这些进程合理地分配时间
- 进程也可以通过派生(fork或spawn)新的进程来执行其他任务
2.1.3 线程介绍
在同一个进程下执行,并共享相同的上下文
一个进程中的各个线程与主线程共享同一片数据空间
线程包括开始、执行顺序和结束三部分
它可以被抢占(中断)和临时挂起(睡眠)--让步
-
线程一般是以并发方式执行
# 并发 # 并发是一种属性,是程序、算法或问题的属性 # 并行只是并发问题的可能方法之一 # 如果两个事件互不影响,则两个事件是并发的
2.1.4 协程介绍
- 协程就是协同多任务
- 协程在一个进程或者一个线程中执行
- 不需要锁的机制(自己调动,在一个进程或者一个线程中执行)
- 对多核CPU的利用——多进程+协程
2.2 对多核利用及GIL概念
2.2.1 对多核的利用
- 单核CPU系统中,不存在真正的并发
- GIL——全局解释器锁
- GIL只是强制在任何时候只有一个线程可以执行python代码
- I/O密集型应用与CPU密集型应用
2.2.2 GIL执行顺序
- 设置GIL
- 切换进一个线程去运行
- 执行下面操作之一
- 指定数量的字节码zhiling
- 线程主动让出控制权(可以调用time.sleep(0)来完成)
- 把线程设置回睡眠状态(切换出线程)
- 解锁GIL
- 重复上述步骤
2.3 线程
2.3.1 线程的threading模块(对象、属性、方法)
threading
模块可以代替thread
模块
对象 | 描述 |
---|---|
Thread | 表示一个执行线程的对象 |
Lock | 锁对象 |
RLock | 可重入锁对象,使单一线程可以(再次)获得已持有的锁(递归锁) |
Condition | 条件变量对象,使得一个线程等待另一个线程满足特定的“条件”,比如改变状态或某个数据值 |
Event | 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程将被激活 |
Semaphore | 为线程间共享的有限资源提供了一个“计时器”,如果没有可用资源时会被阻塞 |
BoundedSemap | 与Semaphore相似,不过它不允许超过初始值 |
Timer | 与Thread相似,不过它要在运行前等待一段时间 |
Barrier | 创建一个“障碍”,必须达到指定数量的线程后才可以继续 |
属性 | 描述 |
---|---|
name | 线程名 |
ident | 线程的标识符 |
daemon | 布尔标志,表示这个线程是否是守护线程 |
方法 | 描述 |
---|---|
__init__() | 实例化一个线程对象,需要有一个可调用的target,以及其参数args或kwargs |
start() | 开始执行该线程 |
run() | 定义线程功能的方法(通常在子类中被重写) |
join(timeout=None) | 直至启动的线程终止之前一直挂起;除非给出了timeout(秒),否则会一直阻塞 |
getName() | 返回线程名 |
setName(name) | 设定线程名 |
isAlivel()/is_alive() | 布尔标识,表示这个线程是否还存活 |
isDaemon() | 如果是守护线程,则返回True;否则,返回False |
setDaemon() | 把线程的守护标志设置为布尔值daemonic(必须在线程start()之前调用) |
2.3.2 不面向对象实现线程
threading.Thead
创建线程start()
启动线程-
join()
挂起线程import threading def run(): for i in range(1,5): print(threading.currentThread().getName()+"---"+str(i)) def main(): print(threading.currentThread().getName()) thread=threading.Thread(target=run,name="thread-loop") thread.start() thread.join() if __name__ == '__main__': main() Result: MainThread thread-loop---1 thread-loop---2 thread-loop---3 thread-loop---4
2.3.3 面向对象实现线程
import threading,time
class MyThread(threading.Thread):
def run(self):
for i in range(0,5):
print(threading.currentThread().getName()+"---"+str(i))
time.sleep(1)
if __name__ == '__main__':
print(threading.currentThread().getName())
thread=MyThread(name="loopthread")
thread.start()
thread.join()
Result:
MainThread
loopthread---0
loopthread---1
loopthread---2
loopthread---3
loopthread---4
2.3.4 多线程并发问题
-
一个线程没操作完,另外线程就开始操作
import threading,time balance=0 def resource(n): global balance balance+=n time.sleep(1) balance-=n print(threading.currentThread().getName()+" n="+str(n)+" balance="+str(balance)) class MyThread(threading.Thread): def __init__(self,n,*args,**kwargs): super().__init__(*args,**kwargs) self.n=n def run(self): for i in range(0,5): resource(self.n) if __name__ == '__main__': thread1=MyThread(n=5,name="thread1") thread2 = MyThread(n=8,name="thread2") thread1.start() thread2.start() thread1.join() thread2.join() Result: thread2 n=8 balance=5thread1 n=5 balance=0 thread1 n=5 balance=8 thread2 n=8 balance=5 thread1 n=5 balance=8 thread2 n=8 balance=5 thread2 n=8 balance=5thread1 n=5 balance=0
2.3.5 多线程中的锁
lock threading.Lock()
获取锁lock threading.RLock()
(对于Lock()只能锁一次来说,RLock()可以多次锁定)void lock.acquire()
获取锁void lock.release()
释放锁锁定时最好利用
try...catch...finally
-
可以使用
with lock:
自动解锁import threading,time balance=0 lock=threading.Lock() def resource(n): global balance with lock: balance+=n time.sleep(1) balance-=n print(threading.currentThread().getName()+" n="+str(n)+" balance="+str(balance)) class MyThread(threading.Thread): def __init__(self,n,*args,**kwargs): super().__init__(*args,**kwargs) self.n=n def run(self): for i in range(0,5): resource(self.n) if __name__ == '__main__': thread1=MyThread(n=5,name="thread1") thread2 = MyThread(n=8,name="thread2") thread1.start() thread2.start() thread1.join() thread2.join() Result: thread1 n=5 balance=0 thread1 n=5 balance=0 thread1 n=5 balance=0 thread1 n=5 balance=0 thread1 n=5 balance=0 thread2 n=8 balance=0 thread2 n=8 balance=0 thread2 n=8 balance=0 thread2 n=8 balance=0 thread2 n=8 balance=0
2.3.6 线程的调度和优化(利用线程池优化)
- 建立线程池,不用每次再建立线程,可以对之前的线程直接拿来使用
# 不使用多线程
import time,threading
def run(n):
time.sleep(1)
print(threading.current_thread().name+" "+str(n))
def low():
global n
begin_time=time.time()
for i in range(0,10):
run(i)
print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
low()
Result:
MainThread 0
MainThread 1
MainThread 2
MainThread 3
MainThread 4
MainThread 5
MainThread 6
MainThread 7
MainThread 8
MainThread 9
time:10.00597333908081
# 使用多线程,不使用线程池
import time,threading
count=0
def run(n):
time.sleep(1)
print(threading.current_thread().name+" "+str(n))
def normal():
global count
begin_time=time.time()
allthread=[]
for i in range(0,3):
if i!=2:
for m in range(0,4): # 在计算机中一次只能开4个线程,这个是不利用线程池,不重复使用线程
thread=threading.Thread(target=run,args=(count,))
count+=1
thread.start()
allthread.append(thread)
for thread in allthread:
thread.join()
else:
for m in range(0,2): # 最后一轮只用开2个线程
thread=threading.Thread(target=run,args=(count,))
count+=1
thread.start()
allthread.append(thread)
for thread in allthread:
thread.join()
print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
normal()
Result:
Thread-1 0
Thread-2 1
Thread-4 3Thread-3 2
Thread-5 4
Thread-6 5
Thread-7 6
Thread-8 7
Thread-9 8
Thread-10 9
time:3.0081448554992676
# 第一种线程池 multiprocessing.dummy.Pool
import time,threading
from multiprocessing.dummy import Pool
def run(n):
time.sleep(1)
print(threading.current_thread().name+" "+str(n))
def pool():
begin_time=time.time()
n_list=range(10)
pool=Pool(4)
pool.map(run,n_list)
pool.close()
pool.join()
print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
pool()
Result:
Thread-1 0
Thread-4 3
Thread-2 1Thread-3 2
Thread-2 7Thread-1 4Thread-3 6
Thread-4 5
Thread-3 8Thread-2 9
time:3.029418706893921
# 第二种线程池(不用start()、join())concurrent.futures.thread.ThreadPoolExecutor
import time,threading
from concurrent.futures.thread import ThreadPoolExecutor
def run(n):
time.sleep(1)
print(threading.current_thread().name+" "+str(n))
def pool():
begin_time=time.time()
n_list=range(10)
with ThreadPoolExecutor(max_workers=4) as pool:
pool.map(run,n_list)
print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
pool()
Result:
ThreadPoolExecutor-0_0 0
ThreadPoolExecutor-0_2 2
ThreadPoolExecutor-0_3 3
ThreadPoolExecutor-0_1 1
ThreadPoolExecutor-0_0 4
ThreadPoolExecutor-0_2 5ThreadPoolExecutor-0_3 6
ThreadPoolExecutor-0_1 7
ThreadPoolExecutor-0_0 8
ThreadPoolExecutor-0_3 9
time:3.0045437812805176
==数据太少,无法准确判断线程池处理大量数据的明显优势;在大量数据的处理时,线程池效率高,第二种线程池比第一种线程池效率高==
2.4 进程
2.4.1 进程的multiprocessing模块
- 创建进程
Process multiprocessing.Process()
- 启动进程
void Process.start()
- 挂起进程
void Process.join()
- 获取进程的ID
int os.getpid()
2.4.1 不面向对象实现进程
import multiprocessing
import os
import time
def somthing(process_name):
print("process_name:{}".format(process_name))
time.sleep(2)
print("process_id:{}".format(os.getpid()))
if __name__ == '__main__':
process=multiprocessing.Process(target=somthing,args=("my_progress",))
process.start()
process.join()
Result:
process_name:my_progress
process_id:12188
2.4.2 面向对象实现进程
import os
import time
from multiprocessing.context import Process
class MyProgress(Process):
def __init__(self,my_progress_name,*args,**kwargs):
self.my_progress_name=my_progress_name
self.name="i will be covered"
print("super__init__:{}".format(self.name))
# slef.name会被父类的构造方法所覆盖
super().__init__(*args,**kwargs)
print("after_super__init__:{}".format(self.name))
def run(self):
print("----------run------------")
print("my_process_name:{}".format(self.my_progress_name))
print("process_name:{}".format(self.name))
time.sleep(2)
print("process_id:{}".format(os.getpid()))
print("-----------run-----------")
if __name__ == '__main__':
progress=MyProgress("myProgress")
progress.start()
progress.join()
Result:
super__init__:i will be covered
after_super__init__:MyProgress-1
----------run------------
my_process_name:myProgress
process_name:MyProgress-1
process_id:10096
-----------run-----------
2.4.3 进程之间的通信
- 通过Queue、Pipes等实现进程之间的通信
import time
from multiprocessing import Process,Queue,current_process
class WriteProgress(Process):
def __init__(self,queue,*args,**kwargs):
self.queue=queue
super().__init__(*args,**kwargs)
def run(self):
msg=["This is first msg",
"This is second msg",
"This is third msg",
"This is fourth msg",
"This is fifth msg"]
for msg_item in msg:
self.queue.put(msg_item)
print("{}--send--msg:{}".format(current_process(),msg_item))
time.sleep(2)
class ReadProgress(Process):
def __init__(self,queue,*args,**kwargs):
self.queue=queue
super().__init__(*args,**kwargs)
def run(self):
while True:
msg=self.queue.get()
print("{}--get--msg:{}".format(current_process(),msg))
if __name__ == '__main__':
queue=Queue()
write=WriteProgress(queue)
read=ReadProgress(queue)
write.start()
read.start()
write.join()
write.join()
read.terminate()
Result:
<WriteProgress(WriteProgress-1, started)>--send--msg:This is first msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is first msg
<WriteProgress(WriteProgress-1, started)>--send--msg:This is second msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is second msg
<WriteProgress(WriteProgress-1, started)>--send--msg:This is third msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is third msg
<WriteProgress(WriteProgress-1, started)>--send--msg:This is fourth msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is fourth msg
<WriteProgress(WriteProgress-1, started)>--send--msg:This is fifth msg
<ReadProgress(ReadProgress-2, started)>--get--msg:This is fifth msg
2.4.4 多进程的锁
- 锁
Lock()
RLock()
Condition()
-
RLock()
可以多次锁定 -
Lock()
只能锁一次,否则造成死锁
# 不使用锁,会让进程”随意“执行,一会这个进程执行,一会另一个程执行
from multiprocessing import Process,current_process
import time
class WriteProgress(Process):
def __init__(self,my_name,*args,**kwargs):
self.my_name=my_name
super().__init__(*args,**kwargs)
def run(self):
for i in range(0,3):
print("progress:{}--progress_number{}".format(self.my_name,i))
time.sleep(2)
if __name__ == '__main__':
write1=WriteProgress("Progress1")
write2=WriteProgress("Progress2")
write3=WriteProgress("Progress3")
write1.start()
write2.start()
write3.start()
write1.join()
write2.join()
write3.join()
Result:
progress:Progress1--progress_number0
progress:Progress2--progress_number0
progress:Progress3--progress_number0
progress:Progress1--progress_number1
progress:Progress2--progress_number1
progress:Progress3--progress_number1
progress:Progress1--progress_number2
progress:Progress2--progress_number2
progress:Progress3--progress_number2
# 使用Lock(),会让一个进程执行好后其它进程再执行
from multiprocessing import Process, Lock
import time
class WriteProgress(Process):
def __init__(self,lock,my_name,*args,**kwargs):
self.my_name=my_name
self.lock=lock
super().__init__(*args,**kwargs)
def run(self):
with self.lock:
for i in range(0,3):
print("progress:{}--progress_number:{}".format(self.my_name,i))
time.sleep(2)
if __name__ == '__main__':
lock=Lock()
write1=WriteProgress(lock,"Progress1")
write2=WriteProgress(lock,"Progress2")
write3=WriteProgress(lock,"Progress3")
write1.start()
write2.start()
write3.start()
write1.join()
write2.join()
write3.join()
Result:
progress:Progress1--progress_number:0
progress:Progress1--progress_number:1
progress:Progress1--progress_number:2
progress:Progress2--progress_number:0
progress:Progress2--progress_number:1
progress:Progress2--progress_number:2
progress:Progress3--progress_number:0
progress:Progress3--progress_number:1
progress:Progress3--progress_number:2
2.4.5 进程池
# 利用进程池,同步任务
# 同步任务,每一个任务执行完后才能拿到返回值,而且拿到的就是返回的值,而不是对象
from multiprocessing import Pool,current_process
import time
def run(i):
time.sleep(1)
return "Progress:{}--assignment_number:{}".format(current_process(), i)
if __name__ == '__main__':
pool=Pool(2)
for i in range(0,10):
result=pool.apply(run,args=(i,))
print(result)
pool.close()
pool.join()
Result:
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:0
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:1
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:2
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:3
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:4
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:5
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:6
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:7
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:8
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:9
# 利用进程池,异步任务
# 异步任务,可以先那到返回值对象,对象拿到的快
# 对象拿完之后,再对任务进行执行,将数据放入对象之中
from multiprocessing import Pool,current_process
import time
def run(i):
# print("Progress:{}--assignment_number:{}".format(current_process(), i))
time.sleep(1)
return "Progress:{}--assignment_number:{}".format(current_process(), i)
if __name__ == '__main__':
pool=Pool(2)
list=[]
for i in range(0,10):
result=pool.apply_async(run,args=(i,))
print(result) # 拿到返回值对象对象很快
list.append(result)
pool.close()
pool.join()
for i in range(0,10):
print(list[i].get())
Result:
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FAC8>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FBA8>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FC50>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FCF8>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FDA0>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FE48>
<multiprocessing.pool.ApplyResult object at 0x0000019C8399FFD0>
<multiprocessing.pool.ApplyResult object at 0x0000019C839B90B8>
<multiprocessing.pool.ApplyResult object at 0x0000019C839B9160>
<multiprocessing.pool.ApplyResult object at 0x0000019C839B9208>
## 几乎是肉眼可见的一下子打印,但是在之前有较长一段时间
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:0
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:1
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:2
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:3
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:4
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:5
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:6
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:7
Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:8
Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:9
2.5 协程
2.5.1 Python3.5之前协程的实现
-
yield
生成器来实现
def yield_test(): # 协程
while True:
n=(yield)
print(n)
if __name__ == '__main__':
result=yield_test()
next(result)
result.send("send=ok")
result.send("hello")
Result:
send=ok
hello
2.5.2 Python3.5之后协程的实现
- 使用
async
和await
关键字来实现 -
async
关键字
- 定义特殊的函数
- 被调用时,不执行里面的代码,而是返回一个协程对象
- 在时间循环中调度其执行前,协程对象不执行任何操作
-
await
关键字
- 等待协程执行完成
- 当遇到阻塞调用的函数的时候,使用
await
方法将协程的控制权让出,以便loop调用其它的协程
-
asyncio
模块-
get_event_loop()
获取事件循环队列 -
run_until_complete()
注册任务到队列 - 在事件循环中调度其执行前,协程对象不执行任何操作
-
asyncio
模块用于事件循环 -
Bool asyncio.iscoroutinefunction(function)
判断是否为协程函数
-
import asyncio
async def do():
print("we are doing it......")
await asyncio.sleep(2)
obj=do()
# 得到事件循环队列
loop=asyncio.get_event_loop()
# 注册任务
task=loop.create_task(obj)
print(task)
# 等待协程任务执行完成
loop.run_until_complete(task)
print(task)
Result:
<Task pending coro=<do() running at D:/study/python/MyTest/test.py:150>>
we are doing it......
## wait 2 second
<Task finished coro=<do() done, defined at D:/study/python/MyTest/test.py:150> result=None>
2.5.3 协程通信之嵌套调用
# 一个协程函数使用另一个协程函数的返回值
import asyncio
async def add(x,y):
print("We are adding.......")
await asyncio.sleep(1)
print("We added completely")
return x+y
async def get_add(x,y):
result=await add(x,y)
print("result={}".format(result))
loop=asyncio.get_event_loop()
loop.run_until_complete(get_add(10,50))
loop.close()
Result:
We are adding.......
We added completely
result=60
2.5.4 协程通信之队列
import asyncio
async def add(queue):
for i in range(0,5):
await asyncio.sleep(1)
await queue.put(i) # put()得到一个协程对象
print("add number-->{},size-->{}".format(i,queue.qsize()))
async def reduce(queue):
for i in range(0,10):
result=await queue.get() # get()得到一个协程对象
print("reduce number-->{},size:{}".format(result,queue.qsize()))
if __name__ == '__main__':
queue=asyncio.Queue(maxsize=5)
add1=add(queue)
add2=add(queue)
reduce=reduce(queue)
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(add1,add2,reduce))
loop.close()
Result:
add number-->0,size-->1
add number-->0,size-->2
reduce number-->0,size:1
reduce number-->0,size:0
add number-->1,size-->1
add number-->1,size-->2
reduce number-->1,size:1
reduce number-->1,size:0
add number-->2,size-->1
add number-->2,size-->2
reduce number-->2,size:1
reduce number-->2,size:0
add number-->3,size-->1
add number-->3,size-->2
reduce number-->3,size:1
reduce number-->3,size:0
add number-->4,size-->1
add number-->4,size-->2
reduce number-->4,size:1
reduce number-->4,size:0