引用计数和分代回收
多线程和锁和线程池
多进程和通信和锁和进程池
协程和异步io async await
内存管理
1. 赋值语句分析
值传递,传的是对象的地址,数字字符元组为不可变类型,赋值后会指向新的地址
def p(word, res=[]):
print(res, id(res))
res.append(word)
print(res, id(res))
return res
res1 = p(3)
res2 = p('12', [])
res3 = p(10)
# result
[] 4447330952
[3] 4447330952
[] 4447252360
['12'] 4447252360
[3] 4447330952
[3, 10] 4447330952
2. 垃圾回收机制
- 以引用计数为主,分代收集为辅
- 如果一个对象的引用数为0,python就会回收这个对象的内存
- 引用计数的缺陷是循环引用的问题,所以用分代回收机制来解决这个问题
3. 内存管理机制
- 引用计数
sys.getrefcount() 获得摸个对象的引用计数,使用del关键字删除某个引用 - 垃圾回收
python记录分配对象和取消分配对象的次数,当两者的差值高于某个阈值时,垃圾回收才启动
通过gc.get_threshold()来查看 0,1,2代的阈值 - 分代回收
新建对象为0代对象,某一代经历垃圾回收,依然存在,归入下一代对象
git.collect(generation=2)指定哪个代回收,默认是2代表所有都回收了
del p 删除某个对象
objgraph模块中的count()可以记录当前类产生的实例对象的个数:objgraph.count('Cat') Cat类的对象个数 - 官方指南
https://docs.python.org/3/library/gc.html#gc.collect
Set the garbage collection thresholds (the collection frequency).
Setting *threshold0* to zero disables collection.
The GC classifies objects into three generations depending on how many collection
sweeps they have survived. New objects are placed in the youngest generation
(generation `0`). If an object survives a collection it is moved into the next older
generation. Since generation `2` is the oldest generation, objects in that generation
remain there after a collection. In order to decide when to run, the collector keeps track of
the number object allocations and deallocations since the last collection. When the
number of allocations minus the number of deallocations exceeds *threshold0*, collection
starts. Initially only generation `0` is examined. If generation `0` has been examined more
than *threshold1* times since generation `1` has been examined, then generation `1` is
examined as well. Similarly, *threshold2* controls the number of collections of
generation `1` before collecting generation `2`.
- 内存池机制,预先在内存中申请一定数量的,大小想到的内存块作备用,有新的内存需求先从内存池中分配; Pyhton3内存管理机制 Pymalloc 针对小对象(<512bytes) ,pymalloc会在内存池中申请内存空间,当>512bytes, 则会PyMem_RawMalloc()和PyMem_RawRealloc()来申请新的空间
4. 源码剖析
https://github.com/Junnplus/blog/projects/1
多线程
1. 进程,线程和协程
2. 进程和线程的关系
进程的内存空间等等不同
线程共享进程上下文,包括开始,执行顺序,结束;可以被抢占(中断)和临时挂起(睡眠)- 让步
并行(parallelism)一个时间有多个任务,并发(concurrency)多个任务同时运行
3. 多核
GIL - 全局解释器锁
GIL强制在任何时候只有一个线程可以执行python代码
I/O密集型应用与CPU密集型应用
GIL执行顺序:
- 设置GIL
- 切换一个线程运行
- 执行指定数量的字节码指令
- 线程主动让出控制权(time.sleep(0))
- 把线程设置为睡眠状态(切换出线程)
- 解锁GIL
- 重复上面的步骤
4. 实现一个线程
threading.Thread创建线程,start()启动线程,join()挂起线程
两种创建方法:载入运行的函数,或者继承Thread类
# 方法1
import threading
import time
def loop():
""" 新的线程执行的代码 """
n = 0
while n < 5:
print(n)
now_thread = threading.current_thread()
print('[loop]now thread name : {0}'.format(now_thread.name))
time.sleep(1)
n += 1
def use_thread():
""" 使用线程来实现 """
# 当前正在执行的线程名称
now_thread = threading.current_thread()
print('now thread name : {0}'.format(now_thread.name))
# 设置线程
t = threading.Thread(target=loop, name='loop_thread')
# 启动线程
t.start()
# 挂起线程
t.join()
if __name__ == '__main__':
use_thread()
# 方法2
import threading
import time
class LoopThread(threading.Thread):
""" 自定义线程 """
n = 0
def run(self):
while self.n < 5:
print(self.n)
now_thread = threading.current_thread()
print('[loop]now thread name : {0}'.format(now_thread.name))
time.sleep(1)
self.n += 1
if __name__ == '__main__':
# 当前正在执行的线程名称
now_thread = threading.current_thread()
print('now thread name : {0}'.format(now_thread.name))
t = LoopThread(name='loop_thread_oop')
t.start()
t.join()
5. 多线程并发问题和锁
线程ThreadLocal
https://www.liaoxuefeng.com/wiki/1016959663602400/1017630786314240
Lock和RLock和Condition,RLock可以锁多次(在同一个线程里),释放的时候也要释放多次
有两种实现方式:try except finally; with
import threading
import time
# 获得一把锁
my_lock = threading.Lock()
your_lock = threading.RLock()
# 我的银行账户
balance = 0
def change_it(n):
""" 改变我的余额 """
global balance
# 方式一,使用with
with your_lock:
balance = balance + n
time.sleep(2)
balance = balance - n
time.sleep(1)
print('-N---> {0}; balance: {1}'.format(n, balance))
# 方式二
# try:
# print('start lock')
# # 添加锁
# your_lock.acquire()
# print('locked one ')
# # 资源已经被锁住了,不能重复锁定, 产生死锁
# your_lock.acquire()
# print('locked two')
# balance = balance + n
# time.sleep(2)
# balance = balance - n
# time.sleep(1)
# print('-N---> {0}; balance: {1}'.format(n, balance))
# finally:
# # 释放掉锁
# your_lock.release()
# your_lock.release()
class ChangeBalanceThread(threading.Thread):
"""
改变银行余额的线程
"""
def __init__(self, num, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num = num
def run(self):
for i in range(100):
change_it(self.num)
if __name__ == '__main__':
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
t1.start()
t2.start()
t1.join()
t2.join()
print('the last: {0}'.format(balance))
6. 线程的调度和优化,线程池
使用线程池
两种方法:
Pool(10) map()/submit() close() join()
with ThreadPoolExecutor(max_workers=10) as executor: map()
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.dummy import Pool
def run(n):
""" 线程要做的事情 """
time.sleep(2)
print(threading.current_thread().name, n)
def main():
""" 使用传统的方法来做任务 """
t1 = time.time()
for n in range(100):
run(n)
print(time.time() - t1)
def main_use_thread():
""" 使用线程优化任务 """
# 资源有限,最多只能跑10个线程
t1 = time.time()
ls = []
for count in range(10):
for i in range(10):
t = threading.Thread(target=run, args=(i,))
ls.append(t)
t.start()
for l in ls:
l.join()
print(time.time() - t1)
def main_use_pool():
""" 使用线程池来优化 """
t1 = time.time()
n_list = range(100)
pool = Pool(10)
pool.map(run, n_list)
pool.close()
pool.join()
print(time.time() - t1)
def main_use_executor():
""" 使用 ThreadPoolExecutor 来优化"""
t1 = time.time()
n_list = range(100)
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(run, n_list)
print(time.time() - t1)
if __name__ == '__main__':
# main()
# main_use_thread()
# main_use_pool()
main_use_executor()
7. 进程实现
multiprocessing模块: multiprocessing.Process; start() join(); os.getpid()
两种方式去实现:传入函数或者面向对象
import os
import time
from multiprocessing import Process
def do_sth(name):
"""
进程要做的事情
:param name: str 进程的名称
"""
print('进程的名称:{0}, pid: {1}'.format(name, os.getpid()))
time.sleep(150)
print('进程要做的事情')
class MyProcess(Process):
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_name = name
def run(self):
print('MyProcess进程的名称:{0}, pid: {1}'.format(
self.my_name, os.getpid()))
time.sleep(150)
print('MyProcess进程要做的事情')
if __name__ == '__main__':
# p = Process(target=do_sth, args=('my process', ))
p = MyProcess('my process class')
# 启动进程
p.start()
# 挂起进程
p.join()
8. 多进程通信
Queue和Pipes
共享一个对象https://docs.python.org/3/library/multiprocessing.html#proxy-objects
Multiprocessing.Value()
from multiprocessing import Process, Queue, current_process
import random
import time
class WriteProcess(Process):
""" 写的进程 """
def __init__(self, q, *args, **kwargs):
self.q = q
super().__init__(*args, **kwargs)
def run(self):
""" 实现进程的业务逻辑 """
# 要写的内容
ls = [
"第一行内容",
"第2行内容",
"第3行内容",
"第4行内容",
]
for line in ls:
print('写入内容: {0} -{1}'.format(line, current_process().name))
self.q.put(line)
# 每写入一次,休息1-5秒
time.sleep(random.randint(1, 5))
class ReadProcess(Process):
""" 读取内容进程 """
def __init__(self, q, *args, **kwargs):
self.q = q
super().__init__(*args, **kwargs)
def run(self):
while True:
content = self.q.get()
print('读取到的内容:{0} - {1}'.format(content, current_process().name))
if __name__ == '__main__':
# 通过Queue共享数据
q = Queue()
# 写入内容的进程
t_write = WriteProcess(q)
t_write.start()
# 读取进程启动
t_read = ReadProcess(q)
t_read.start()
t_write.join()
# 因为读的进程是死循环,无法等待其结束,只能强制终止
t_read.terminate()
9. 多进程中的锁
Lock(), Rlock(), Condition()
机制跟多线程锁基本是一样的,用try finally 或者with结构
import random
from multiprocessing import Process, Lock, RLock
import time
class WriteProcess(Process):
""" 写入文件 """
def __init__(self, file_name, num, lock, *args, **kwargs):
# 文件的名称
self.file_name = file_name
self.num = num
# 锁对象
self.lock = lock
super().__init__(*args, **kwargs)
def run(self):
""" 写入文件的主要业务逻辑 """
with self.lock:
# try:
# # 添加锁
# self.lock.acquire()
# print('locked')
# self.lock.acquire()
# print('relocked')
for i in range(5):
content = '现在是: {0} : {1} - {2} \n'.format(
self.name,
self.pid,
self.num
)
with open(self.file_name, 'a+', encoding='utf-8') as f:
f.write(content)
time.sleep(random.randint(1, 5))
print(content)
# finally:
# # 释放锁
# self.lock.release()
# self.lock.release()
if __name__ == '__main__':
file_name = 'test.txt'
# 所的对象
lock = RLock()
for x in range(5):
p = WriteProcess(file_name, x, lock)
p.start()
10. 进程池
有同步和异步的方法;Pool() apply() apply_async() map() close()
https://docs.python.org/3/library/multiprocessing.html?highlight=pool#module-multiprocessing.pool
import random
from multiprocessing import current_process, Pool
import time
def run(file_name, num):
"""
进程执行的业务逻辑
往文件中写入数据
:param file_name: str 文件名称
:param num: int 写入的数字
:return: str 写入的结果
"""
with open(file_name, 'a+', encoding='utf-8') as f:
# 当前的进程
now_process = current_process()
# 写入的内容
conent = '{0} - {1}- {2}'.format(
now_process.name,
now_process.pid,
num
)
f.write(conent)
f.write('\n')
# 写完之后随机休息1-5秒
time.sleep(random.randint(1, 5))
print(conent)
return 'ok'
if __name__ == '__main__':
file_name = 'test_pool.txt'
# 进程池
pool = Pool(2)
rest_list = []
for i in range(20):
# 同步添加任务
# rest = pool.apply(run, args=(file_name, i))
rest = pool.apply_async(run, args=(file_name, i))
rest_list.append(rest)
print('{0}--- {1}'.format(i, rest))
# 关闭池子
pool.close()
pool.join()
# 查看异步执行的结果
print(rest_list[0].get())
11. 协程
协同多任务,不需要锁机制,用多进程+协程对多核CPU的利用
python 3..5之前使用 yield实现
https://docs.python.org/3/library/asyncio-task.html
def count_down(n):
""" 倒计时效果 """
while n > 0:
yield n
n -= 1
def yield_test():
""" 实现协程函数 """
while True:
n = (yield )
print(n)
if __name__ == '__main__':
# rest = count_down(5)
# print(next(rest))
# print(next(rest))
# print(next(rest))
# print(next(rest))
# print(next(rest))
rest = yield_test()
next(rest)
rest.send('6666')
rest.send('6666')
之后用async和await来实现
async函数被调用时,返回一个协程对象,不执行这个函数
在事件循环调度其执行前,协程对象不执行
await等待协程执行完成,当遇到阻塞调用的函数时,await方法将协程的控制权让出,继续执行其他协程
asyncio模块:get_event_loop() run_until_complete() iscoroutinefunction(do_sth)
import asyncio
async def do_sth(x):
""" 定义协程函数 """
print('等待中: {0}'.format(x))
await asyncio.sleep(x)
# 判断是否为协程函数
print(asyncio.iscoroutinefunction(do_sth))
coroutine = do_sth(5)
# 事件的循环队列
loop = asyncio.get_event_loop()
# 注册任务
task = loop.create_task(coroutine)
print(task)
# 等待协程任务执行结束
loop.run_until_complete(task)
print(task)
12. 协程通信之嵌套调用和队列
- 嵌套调用
import asyncio
async def compute(x, y):
print('计算x +y => {0}+{1}'.format(x, y))
await asyncio.sleep(3)
return x + y
async def get_sum(x, y):
rest = await compute(x, y)
print('{0} + {1} = {2}'.format(x, y, rest))
# 拿到事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(get_sum(1, 2))
loop.close()
- 队列
# 1. 定义一个队列
# 2. 让两个协程来进行通信
# 3. 让其中一个协程往队列中写入数据
# 4. 让另一个协程从队列中删除数据
import asyncio
import random
async def add(store, name):
"""
写入数据到队列
:param store: 队列的对象
:return:
"""
for i in range(5):
# 往队列中添加数字
num = '{0} - {1}'.format(name, i)
await asyncio.sleep(random.randint(1, 5))
await store.put(i)
print('{2} add one ... {0}, size: {1}'.format(
num, store.qsize(), name))
async def reduce(store):
"""
从队列中删除数据
:param store:
:return:
"""
for i in range(10):
rest = await store.get()
print(' reduce one.. {0}, size: {1}'.format(rest, store.qsize()))
if __name__ == '__main__':
# 准备一个队列
store = asyncio.Queue(maxsize=5)
a1 = add(store, 'a1')
a2 = add(store, 'a2')
r1 = reduce(store)
# 添加到事件队列
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(a1, a2, r1))
loop.close()
使用async和await来改写协程
import random
from queue import Queue
import asyncio
class Bread(object):
""" 馒头类 """
def __init__(self, name):
self.name = name
def __str__(self):
return self.name
async def consumer(name, basket, lock):
"""
消费者
:param name 协程的名称
:param basket 篮子,用于存放馒头
:return:
"""
while True:
with await lock:
# 如果没有馒头了,则自己休息,唤醒生产者进行生产
if basket.empty():
print('{0}@@@消费完了@@@'.format(name))
# 唤醒他人
lock.notify_all()
# 休息
await lock.wait()
else:
# 取出馒头
bread = basket.get()
print('>>{0} 消费馒头 {1}, size: {2}'.format(
name, bread, basket.qsize()
))
await asyncio.sleep(random.randint(1, 5))
async def producer(name, basket, lock):
"""
生产者
:param name 协程的名称
:param basket 篮子,用于存放馒头
:return:
"""
print('{0} 开始生产'.format(name))
while True:
with await lock:
# 馒头生产满了,休息生产者,唤醒消费者进行消费
if basket.full():
print('{0} 生产满了'.format(name))
# 唤醒他人
lock.notify_all()
# 自己休息
await lock.wait()
else:
# 馒头的名字
bread_name = '{0}_{1}'.format(name, basket.counter)
bread = Bread(bread_name)
# 将馒头放入篮子
basket.put(bread)
print('>>{0} 生产馒头 {1}, size: {2}'.format(name, bread_name, basket.qsize()))
# 计数+ 1
basket.counter += 1
await asyncio.sleep(random.randint(1, 2))
class Basket(Queue):
""" 自定义的仓库 """
# 馒头生产的计数器
counter = 0
def main():
lock = asyncio.Condition()
# 篮子,用于放馒头,协程通信使用
basket = Basket(maxsize=5)
p1 = producer('P1', basket, lock)
p2 = producer('P2', basket, lock)
p3 = producer('P3', basket, lock)
c1 = consumer('C1', basket, lock)
c2 = consumer('C2', basket, lock)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(p1, p2, p3, c1, c2))
loop.close()
if __name__ == '__main__':
main()
官方文档
https://docs.python.org/3/library/asyncio-task.html#coroutine