Python基础语法 - 5 内存管理和多线程

引用计数和分代回收
多线程和锁和线程池
多进程和通信和锁和进程池
协程和异步io async await

内存管理

1. 赋值语句分析

值传递,传的是对象的地址,数字字符元组为不可变类型,赋值后会指向新的地址

def p(word, res=[]):
  print(res, id(res))
  res.append(word)
  print(res, id(res))
  return res
res1 = p(3)
res2 = p('12', [])
res3 = p(10)
# result
[] 4447330952
[3] 4447330952
[] 4447252360
['12'] 4447252360
[3] 4447330952
[3, 10] 4447330952

2. 垃圾回收机制

  • 以引用计数为主,分代收集为辅
  • 如果一个对象的引用数为0,python就会回收这个对象的内存
  • 引用计数的缺陷是循环引用的问题,所以用分代回收机制来解决这个问题

3. 内存管理机制

  • 引用计数
    sys.getrefcount() 获得摸个对象的引用计数,使用del关键字删除某个引用
  • 垃圾回收
    python记录分配对象和取消分配对象的次数,当两者的差值高于某个阈值时,垃圾回收才启动
    通过gc.get_threshold()来查看 0,1,2代的阈值
  • 分代回收
    新建对象为0代对象,某一代经历垃圾回收,依然存在,归入下一代对象
    git.collect(generation=2)指定哪个代回收,默认是2代表所有都回收了
    del p 删除某个对象
    objgraph模块中的count()可以记录当前类产生的实例对象的个数:objgraph.count('Cat') Cat类的对象个数
  • 官方指南
    https://docs.python.org/3/library/gc.html#gc.collect
Set the garbage collection thresholds (the collection frequency). 
Setting *threshold0* to zero disables collection.

The GC classifies objects into three generations depending on how many collection
sweeps they have survived. New objects are placed in the youngest generation
(generation `0`). If an object survives a collection it is moved into the next older
generation. Since generation `2` is the oldest generation, objects in that generation
remain there after a collection. In order to decide when to run, the collector keeps track of
the number object allocations and deallocations since the last collection. When the
number of allocations minus the number of deallocations exceeds *threshold0*, collection
starts. Initially only generation `0` is examined. If generation `0` has been examined more
than *threshold1* times since generation `1` has been examined, then generation `1` is
examined as well. Similarly, *threshold2* controls the number of collections of
generation `1` before collecting generation `2`.
  • 内存池机制,预先在内存中申请一定数量的,大小想到的内存块作备用,有新的内存需求先从内存池中分配; Pyhton3内存管理机制 Pymalloc 针对小对象(<512bytes) ,pymalloc会在内存池中申请内存空间,当>512bytes, 则会PyMem_RawMalloc()和PyMem_RawRealloc()来申请新的空间

4. 源码剖析

https://github.com/Junnplus/blog/projects/1

多线程

1. 进程,线程和协程

2. 进程和线程的关系

进程的内存空间等等不同
线程共享进程上下文,包括开始,执行顺序,结束;可以被抢占(中断)和临时挂起(睡眠)- 让步
并行(parallelism)一个时间有多个任务,并发(concurrency)多个任务同时运行

3. 多核

GIL - 全局解释器锁
GIL强制在任何时候只有一个线程可以执行python代码
I/O密集型应用与CPU密集型应用
GIL执行顺序:

  • 设置GIL
  • 切换一个线程运行
    • 执行指定数量的字节码指令
    • 线程主动让出控制权(time.sleep(0))
  • 把线程设置为睡眠状态(切换出线程)
  • 解锁GIL
  • 重复上面的步骤

4. 实现一个线程

threading.Thread创建线程,start()启动线程,join()挂起线程





两种创建方法:载入运行的函数,或者继承Thread类

# 方法1
import threading
import time


def loop():
    """ 新的线程执行的代码 """
    n = 0
    while n < 5:
        print(n)
        now_thread = threading.current_thread()
        print('[loop]now  thread name : {0}'.format(now_thread.name))
        time.sleep(1)
        n += 1


def use_thread():
    """ 使用线程来实现 """
    # 当前正在执行的线程名称
    now_thread = threading.current_thread()
    print('now  thread name : {0}'.format(now_thread.name))
    # 设置线程
    t = threading.Thread(target=loop, name='loop_thread')
    # 启动线程
    t.start()
    # 挂起线程
    t.join()


if __name__ == '__main__':
    use_thread()

# 方法2
import threading
import time


class LoopThread(threading.Thread):
    """ 自定义线程 """

    n = 0

    def run(self):
        while self.n < 5:
            print(self.n)
            now_thread = threading.current_thread()
            print('[loop]now  thread name : {0}'.format(now_thread.name))
            time.sleep(1)
            self.n += 1


if __name__ == '__main__':
    # 当前正在执行的线程名称
    now_thread = threading.current_thread()
    print('now  thread name : {0}'.format(now_thread.name))
    t = LoopThread(name='loop_thread_oop')
    t.start()
    t.join()

5. 多线程并发问题和锁

线程ThreadLocal
https://www.liaoxuefeng.com/wiki/1016959663602400/1017630786314240
Lock和RLock和Condition,RLock可以锁多次(在同一个线程里),释放的时候也要释放多次
有两种实现方式:try except finally; with

import threading
import time


# 获得一把锁
my_lock = threading.Lock()
your_lock = threading.RLock()

# 我的银行账户
balance = 0


def change_it(n):
    """ 改变我的余额 """
    global balance

    # 方式一,使用with
    with your_lock:
        balance = balance + n
        time.sleep(2)
        balance = balance - n
        time.sleep(1)
        print('-N---> {0}; balance: {1}'.format(n, balance))

    # 方式二
    # try:
    #     print('start lock')
    #     # 添加锁
    #     your_lock.acquire()
    #     print('locked one ')
    #     # 资源已经被锁住了,不能重复锁定, 产生死锁
    #     your_lock.acquire()
    #     print('locked two')
    #     balance = balance + n
    #     time.sleep(2)
    #     balance = balance - n
    #     time.sleep(1)
    #     print('-N---> {0}; balance: {1}'.format(n, balance))
    # finally:
    #     # 释放掉锁
    #     your_lock.release()
    #     your_lock.release()


class ChangeBalanceThread(threading.Thread):
    """
    改变银行余额的线程
    """

    def __init__(self, num, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.num = num

    def run(self):
        for i in range(100):
            change_it(self.num)


if __name__ == '__main__':
    t1 = ChangeBalanceThread(5)
    t2 = ChangeBalanceThread(8)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('the last: {0}'.format(balance))

6. 线程的调度和优化,线程池

使用线程池
两种方法:
Pool(10) map()/submit() close() join()
with ThreadPoolExecutor(max_workers=10) as executor: map()

import time
import threading
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.dummy import Pool


def run(n):
    """ 线程要做的事情 """
    time.sleep(2)
    print(threading.current_thread().name, n)


def main():
    """ 使用传统的方法来做任务 """
    t1 = time.time()
    for n in range(100):
        run(n)
    print(time.time() - t1)


def main_use_thread():
    """ 使用线程优化任务 """
    # 资源有限,最多只能跑10个线程
    t1 = time.time()
    ls = []
    for count in range(10):
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            ls.append(t)
            t.start()

        for l in ls:
            l.join()
    print(time.time() - t1)


def main_use_pool():
    """ 使用线程池来优化 """
    t1 = time.time()
    n_list = range(100)
    pool = Pool(10)
    pool.map(run, n_list)
    pool.close()
    pool.join()
    print(time.time() - t1)


def main_use_executor():
    """ 使用 ThreadPoolExecutor 来优化"""
    t1 = time.time()
    n_list = range(100)
    with ThreadPoolExecutor(max_workers=10) as executor:
        executor.map(run, n_list)
    print(time.time() - t1)


if __name__ == '__main__':
    # main()
    # main_use_thread()
    # main_use_pool()
    main_use_executor()

7. 进程实现

multiprocessing模块: multiprocessing.Process; start() join(); os.getpid()
两种方式去实现:传入函数或者面向对象

import os
import time
from multiprocessing import Process


def do_sth(name):
    """
    进程要做的事情
    :param name: str 进程的名称
    """
    print('进程的名称:{0}, pid: {1}'.format(name, os.getpid()))
    time.sleep(150)
    print('进程要做的事情')


class MyProcess(Process):

    def __init__(self, name, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_name = name

    def run(self):
        print('MyProcess进程的名称:{0}, pid: {1}'.format(
            self.my_name, os.getpid()))
        time.sleep(150)
        print('MyProcess进程要做的事情')


if __name__ == '__main__':
    # p = Process(target=do_sth, args=('my process', ))
    p = MyProcess('my process class')
    # 启动进程
    p.start()
    # 挂起进程
    p.join()

8. 多进程通信

Queue和Pipes
共享一个对象https://docs.python.org/3/library/multiprocessing.html#proxy-objects

Multiprocessing.Value()

from multiprocessing import Process, Queue, current_process

import random
import time


class WriteProcess(Process):
    """ 写的进程 """
    def __init__(self, q, *args, **kwargs):
        self.q = q
        super().__init__(*args, **kwargs)

    def run(self):
        """ 实现进程的业务逻辑 """
        # 要写的内容
        ls = [
            "第一行内容",
            "第2行内容",
            "第3行内容",
            "第4行内容",
        ]
        for line in ls:
            print('写入内容: {0} -{1}'.format(line, current_process().name))
            self.q.put(line)
            # 每写入一次,休息1-5秒
            time.sleep(random.randint(1, 5))


class ReadProcess(Process):
    """ 读取内容进程 """
    def __init__(self, q, *args, **kwargs):
        self.q = q
        super().__init__(*args, **kwargs)

    def run(self):
        while True:
            content = self.q.get()
            print('读取到的内容:{0} - {1}'.format(content, current_process().name))


if __name__ == '__main__':
    # 通过Queue共享数据
    q = Queue()
    # 写入内容的进程
    t_write = WriteProcess(q)
    t_write.start()
    # 读取进程启动
    t_read = ReadProcess(q)
    t_read.start()
    t_write.join()

    # 因为读的进程是死循环,无法等待其结束,只能强制终止
    t_read.terminate()

9. 多进程中的锁

Lock(), Rlock(), Condition()
机制跟多线程锁基本是一样的,用try finally 或者with结构

import random
from multiprocessing import Process, Lock, RLock

import time


class WriteProcess(Process):
    """ 写入文件 """

    def __init__(self, file_name, num, lock, *args, **kwargs):
        # 文件的名称
        self.file_name = file_name
        self.num = num
        # 锁对象
        self.lock = lock
        super().__init__(*args, **kwargs)

    def run(self):
        """ 写入文件的主要业务逻辑 """
        with self.lock:
        # try:
        #     # 添加锁
        #     self.lock.acquire()
        #     print('locked')
        #     self.lock.acquire()
        #     print('relocked')
            for i in range(5):
                content = '现在是: {0} : {1} - {2} \n'.format(
                    self.name,
                    self.pid,
                    self.num
                )
                with open(self.file_name, 'a+', encoding='utf-8') as f:
                    f.write(content)
                    time.sleep(random.randint(1, 5))
                    print(content)
        # finally:
        #     # 释放锁
        #     self.lock.release()
        #     self.lock.release()


if __name__ == '__main__':
    file_name = 'test.txt'
    # 所的对象
    lock = RLock()
    for x in range(5):
        p = WriteProcess(file_name, x, lock)
        p.start()

10. 进程池

有同步和异步的方法;Pool() apply() apply_async() map() close()
https://docs.python.org/3/library/multiprocessing.html?highlight=pool#module-multiprocessing.pool

import random
from multiprocessing import current_process, Pool

import time


def run(file_name, num):
    """
    进程执行的业务逻辑
    往文件中写入数据
    :param file_name: str 文件名称
    :param num: int 写入的数字
    :return: str 写入的结果
    """
    with open(file_name, 'a+', encoding='utf-8') as f:
        # 当前的进程
        now_process = current_process()
        # 写入的内容
        conent = '{0} - {1}- {2}'.format(
            now_process.name,
            now_process.pid,
            num
        )
        f.write(conent)
        f.write('\n')
        # 写完之后随机休息1-5秒
        time.sleep(random.randint(1, 5))
        print(conent)
    return 'ok'


if __name__ == '__main__':
    file_name = 'test_pool.txt'
    # 进程池
    pool = Pool(2)
    rest_list = []
    for i in range(20):
        # 同步添加任务
        # rest = pool.apply(run, args=(file_name, i))
        rest = pool.apply_async(run, args=(file_name, i))
        rest_list.append(rest)
        print('{0}--- {1}'.format(i, rest))
    # 关闭池子
    pool.close()
    pool.join()
    # 查看异步执行的结果
    print(rest_list[0].get())

11. 协程

协同多任务,不需要锁机制,用多进程+协程对多核CPU的利用
python 3..5之前使用 yield实现
https://docs.python.org/3/library/asyncio-task.html



def count_down(n):
    """ 倒计时效果 """
    while n > 0:
        yield n
        n -= 1


def yield_test():
    """ 实现协程函数 """
    while True:
        n = (yield )
        print(n)


if __name__ == '__main__':
    # rest = count_down(5)
    # print(next(rest))
    # print(next(rest))
    # print(next(rest))
    # print(next(rest))
    # print(next(rest))
    rest = yield_test()
    next(rest)
    rest.send('6666')
    rest.send('6666')

之后用async和await来实现
async函数被调用时,返回一个协程对象,不执行这个函数
在事件循环调度其执行前,协程对象不执行
await等待协程执行完成,当遇到阻塞调用的函数时,await方法将协程的控制权让出,继续执行其他协程
asyncio模块:get_event_loop() run_until_complete() iscoroutinefunction(do_sth)

import asyncio


async def do_sth(x):
    """ 定义协程函数 """
    print('等待中: {0}'.format(x))
    await asyncio.sleep(x)

# 判断是否为协程函数
print(asyncio.iscoroutinefunction(do_sth))

coroutine = do_sth(5)
# 事件的循环队列
loop = asyncio.get_event_loop()
# 注册任务
task = loop.create_task(coroutine)
print(task)
# 等待协程任务执行结束
loop.run_until_complete(task)
print(task)

12. 协程通信之嵌套调用和队列

  • 嵌套调用
import asyncio


async def compute(x, y):
    print('计算x +y => {0}+{1}'.format(x, y))
    await asyncio.sleep(3)
    return x + y


async def get_sum(x, y):
    rest = await compute(x, y)
    print('{0} + {1} = {2}'.format(x, y, rest))

# 拿到事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(get_sum(1, 2))
loop.close()
  • 队列
# 1. 定义一个队列
# 2. 让两个协程来进行通信
# 3. 让其中一个协程往队列中写入数据
# 4. 让另一个协程从队列中删除数据

import asyncio
import random


async def add(store, name):
    """
    写入数据到队列
    :param store: 队列的对象
    :return:
    """
    for i in range(5):
        # 往队列中添加数字
        num = '{0} - {1}'.format(name, i)
        await asyncio.sleep(random.randint(1, 5))
        await store.put(i)
        print('{2} add one ... {0}, size: {1}'.format(
            num, store.qsize(), name))


async def reduce(store):
    """
    从队列中删除数据
    :param store:
    :return:
    """
    for i in range(10):
        rest = await store.get()
        print(' reduce one.. {0}, size: {1}'.format(rest, store.qsize()))


if __name__ == '__main__':
    # 准备一个队列
    store = asyncio.Queue(maxsize=5)
    a1 = add(store, 'a1')
    a2 = add(store, 'a2')
    r1 = reduce(store)

    # 添加到事件队列
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(a1, a2, r1))
    loop.close()

使用async和await来改写协程

import random
from queue import Queue

import asyncio


class Bread(object):
    """ 馒头类 """

    def __init__(self, name):
        self.name = name

    def __str__(self):
        return self.name


async def consumer(name, basket, lock):
    """
    消费者
    :param name  协程的名称
    :param basket  篮子,用于存放馒头
    :return:
    """
    while True:
        with await lock:
            # 如果没有馒头了,则自己休息,唤醒生产者进行生产
            if basket.empty():
                print('{0}@@@消费完了@@@'.format(name))
                # 唤醒他人
                lock.notify_all()
                # 休息
                await lock.wait()
            else:
                # 取出馒头
                bread = basket.get()
                print('>>{0} 消费馒头 {1}, size: {2}'.format(
                    name, bread, basket.qsize()
                ))
                await asyncio.sleep(random.randint(1, 5))


async def producer(name, basket, lock):
    """
    生产者
    :param name  协程的名称
    :param basket  篮子,用于存放馒头
    :return:
    """
    print('{0} 开始生产'.format(name))
    while True:
        with await lock:
            # 馒头生产满了,休息生产者,唤醒消费者进行消费
            if basket.full():
                print('{0} 生产满了'.format(name))
                # 唤醒他人
                lock.notify_all()
                # 自己休息
                await lock.wait()
            else:
                # 馒头的名字
                bread_name = '{0}_{1}'.format(name, basket.counter)
                bread = Bread(bread_name)
                # 将馒头放入篮子
                basket.put(bread)
                print('>>{0} 生产馒头 {1}, size: {2}'.format(name, bread_name, basket.qsize()))
                # 计数+ 1
                basket.counter += 1
                await asyncio.sleep(random.randint(1, 2))


class Basket(Queue):
    """ 自定义的仓库 """
    # 馒头生产的计数器
    counter = 0


def main():
    lock = asyncio.Condition()
    # 篮子,用于放馒头,协程通信使用
    basket = Basket(maxsize=5)
    p1 = producer('P1', basket, lock)
    p2 = producer('P2', basket, lock)
    p3 = producer('P3', basket, lock)
    c1 = consumer('C1', basket, lock)
    c2 = consumer('C2', basket, lock)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(p1, p2, p3, c1, c2))
    loop.close()


if __name__ == '__main__':
    main()

官方文档
https://docs.python.org/3/library/asyncio-task.html#coroutine

https://www.imooc.com/article/266370

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容

  • 一. 操作系统概念 操作系统位于底层硬件与应用软件之间的一层.工作方式: 向下管理硬件,向上提供接口.操作系统进行...
    月亮是我踢弯得阅读 5,965评论 3 28
  • 1. 内存管理机制 1.1 介绍 概要赋值语句内存分析垃圾回收机制内存管理机制 目标掌握赋值语句内存分析方法掌握i...
    nimw阅读 799评论 0 2
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,535评论 0 5
  • 线程 操作系统线程理论 线程概念的引入背景 进程 之前我们已经了解了操作系统中进程的概念,程序并不能单独运行,只有...
    go以恒阅读 1,638评论 0 6
  • Mac OS X,UNIX,Linux,Windows等,都是多任务操作系统即操作系统可以同时运行多个任务。对于操...
    枫頔阅读 530评论 0 1