并发通信

多进程之间通信的限制

  • 进程之间是独立的,互不干扰的内存空间。
    我们先看个例子
a = 1  #定义全局变量
def func():
    global a
    a=2     #修改全局变量值
    print(a)

func()
print(a)

运行结果:


image.png

再看利用进程运行的例子:

import multiprocessing

a = 1  #定义全局变量

def func():
    global a
    a=2     #修改全局变量值
    print(a)

process = multiprocessing.Process(target=func)
process.start()
process.join() #等待子进程执行完再继续执行
print(a)
image.png

通过上面2个例子运行结果分析:
按通常应该都是2,应该修改了全局变量值,但是这里只有子进程是2,主进程是1。
这是因为进程之间是独立的,互不干扰的内存空间,故子进程修改的,不影响主进程的。

进程间通信的解决方案

image.png
print('--------------进程间通信的解决方案--------------')

manager = multiprocessing.Manager()  #创建一个服务器进程,并返回与其通信的管理器
list_proxy = manager.list()  #通过管理器在服务器进程中开辟一个列表空间,并返回一个代理
print(list_proxy)   #用法和list一样


def func2(list):
    list.append('a')
    print(list)

#把代理传给子进程,子进程里就可以通过这个代理,来操作共享空间来进行通信
process2 = multiprocessing.Process(target=func2, args=(list_proxy,))
process2.start()
process2.join() #等待子进程执行完再继续执行
print(list_proxy)

运行结果:
image.png
  • 一般常用的空间类型是:
    mgr.list()、mgr.dict()、mgr.Queue()

多线程之间通信的限制

注意:因为线程属于同一个进程,因此它们之间共享内存区域,因此全局变量是公共的。

import threading

a = 1
def func3():
    global a
    a = 2
    print(a)
thread = threading.Thread(target=func3)
thread.start()
thread.join()
print(a)

运行结果:
image.png

但是多线程间共享内存间存在竞争问题。

print('--------------多线程共享内存间存在竞争问题--------------')
import threading

data = 0
n = 100000

def add(n):
    global data
    for i in range(n):
        data +=i

def sub(n):
    global data

    for i in range(n):
        data -=i

t_add = threading.Thread(target=add, args=(n,))
t_sub = threading.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()   #这2个地方加join阻塞目的是为了让子进程执行完,最后能在主进程看到data,所以用join来阻塞

print(data)

image.png

加了n次减了n次,结果却为负数,按正常应该为0。
使用锁来控制共享资源的访问。

print('--------------使用锁来控制共享资源的访问--------------')

import threading

data = 0
n = 1000000

lock = threading.Lock() #生成一把锁

def add(n):
    global data
    for i in range(n):
        # lock.acquire()   #加锁
        # data +=i
        # lock.release()   #释放锁
        #可以写生上下文格式
        with lock:
            data +=i

def sub(n):
    global data

    for i in range(n):
        # lock.acquire()   #加锁
        # data -=i
        # lock.release()   #释放锁
        with lock:
            data -=i

t_add = threading.Thread(target=add, args=(n,))
t_sub = threading.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()   #这2个地方加join阻塞目的是为了让子进程执行完,最后能在主进程看到data,所以用join来阻塞

print(data)

运行结果:
image.png

这样才达到目的,就像去银行存钱取钱,存取不多不少!

线程与进程的安全队列

队列:先进先出,一个入口,一个出口。
image.png
  • 线程安全队列操作
    queue.Queue:
    入队: put(item)
    出队: get()
    测试空: empty() # 近似
    测试满: full() # 近似
    队列长度: qsize() # 近似
    任务结束: task_done()
    等待完成: join()
  • 进程安全队列操作
    mgr.Queue:
    入队: put(item)
    出队: get()
    测试空: empty() # 近似
    测试满: full() # 近似
    队列长度: qsize() # 近似

进程比线程少了task_done()和 join()方法。

生产者和消费者模型

所谓,生产者与消费者模型,本质上是把进程通信的问题分开考虑生产只需要往队列里面丢东西(生产者不需要关心消费者)消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)。


image.png

image.png

线程实现生产者-消费者模型


print('--------------生产者与消费者模型--------------')
'''
所谓,生产者与消费者模型,本质上是把进程通信的问题分开考虑
生产者,只需要往队列里面丢东西(生产者不需要关心消费者)
消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)
'''

print('--------------多线程的消费者与生产者模式--------------')
'''
生产者:没满,则生产,只关心队列是否已满。满了就阻塞。
消费者:只关心队列是否为空。不为空,则消费,为空则阻塞。

'''
import threading
import queue
import random
import time

class Producer(threading.Thread):  #生产者
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while True:
            item = random.randint(0, 10) #创建0~99
            #只要队列没满,就向队列中添加数据
            self.queue.put(item)
            print('生产者-->生产:%s'%item)
            time.sleep(1)

class Customer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while True:
            #只要队列不为空,就从队列中取数据
            itme = self.queue.get()
            print('消费者-->消费:%s'%itme)
            time.sleep(1)

q =queue.Queue(5)  #长度为5
producer = Producer(q)
custormer = Customer(q)
producer.start()
custormer.start()
producer.join()

运行结果:
image.png

进程实现生产者-消费者模型


import multiprocessing
import queue
import random
import time

class Producer(multiprocessing.Process):  #生产者
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while True:
            item = random.randint(0, 10) #创建0~99
            #只要队列没满,就向队列中添加数据
            self.queue.put(item)
            print('生产者-->生产:%s'%item)
            time.sleep(1)

class Customer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while True:
            #只要队列不为空,就从队列中取数据
            itme = self.queue.get()
            print('消费者-->消费:%s'%itme)
            time.sleep(1)

manager = multiprocessing.Manager()  #创建一个服务器进程,并返回与其通信的管理器

q =manager.Queue(5)  #长度为5
producer = Producer(q)
custormer = Customer(q)
producer.start()
custormer.start()
producer.join()

运行结果:
image.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,588评论 0 5
  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 11,334评论 4 56
  • 七绝.贺成梅感恩酒会有感 文‖党爱元 各路亲朋人气旺, 灯光耀眼溢光芒。 美肴佳味心真切, 团队精神凝聚强。
    落寞在凉州的烟雨里阅读 346评论 0 1
  • 康熙皇帝得了一种怪病,宫中御医把所有的名贵药材都用遍了, 就是不见病情好转,他一怒之下停止了用药。这天,康熙独自出...
    转发件阅读 194评论 0 0
  • 又是一个谈论新老板和旧老板的时节,我知道你也经常会在私下谈论自己的老板,但像我这样有勇气公开的直白的残暴的谈论和比...
    blueYork阅读 665评论 0 4