环境
Python 3.6.4
简介
Blinker是一个基于Python的强大的信号库,支持一对一、一对多的订阅发布模式,支持发送任意大小的数据等等,且线程安全。
安装
pip install blinker
使用
signal
为单例模式
signal 使用了单例模式,允许代码的不同模块得到相同的signal,而不用互相传参。
In [1]: from blinker import signal
In [2]: a = signal('signal_test')
In [3]: b = signal('signal_test')
In [4]: a is b
Out[4]: True
订阅信号
使用.connect(func)
方法来订阅一个信号,当信号发布时,该信号的订阅者会执行func
。
In [5]: def subscriber(sender):
...: print('Got a signal sent by {}'.format(sender))
...:
In [6]: ready = signal('ready')
In [7]: ready.connect(subscriber)
Out[7]: <function __main__.subscriber(sender)>
发布信号
使用.send()
方法来发布信号,会通知所有订阅者,如果没有订阅者则什么都不会发生。
In [12]: class Processor(object):
...:
...: def __init__(self, name):
...: self.name = name
...:
...: def go(self):
...: ready = signal('ready')
...: ready.send(self)
...: print('Processing...')
...: complete = signal('complete')
...: complete.send(self)
...:
...: def __repr__(self):
...: return '<Processor {}>'.format(self.name)
...:
In [13]: processor_a = Processor('a')
In [14]: processor_a.go()
Got a signal sent by <Processor a>
Processing...
订阅指定的发布者
.connect()
方法接收一个可选参数sender
,可用于接收指定发布者的信号。
In [18]: def b_subscriber():
...: print('Caught signal from peocessor_b')
...:
In [19]: ready.connect(b_subscriber, sender=processor_b)
Out[19]: <function __main__.b_subscriber(sender)>
In [20]: processor_a.go()
Got a signal sent by <Processor a>
Processing...
In [21]: processor_b.go()
Got a signal sent by <Processor b>
Caught signal from peocessor_b
Processing...
订阅者接收发布者传递的数据
除了之前的通过.connect
方法来订阅外,还可以通过装饰器的方法来订阅。
订阅的方法可以接收发布者传递的数据。
In [22]: send_data = signal('send-data')
In [23]: @send_data.connect
...: def receive_data(sender, **kw):
...: print('Caught signal from {}, data: {}'.format(sender, kw))
...: return 'received!'
...:
...:
In [24]: result = send_data.send('anonymous', abc=123)
Caught signal from anonymous, data: {'abc': 123}
.send
方法的返回值是一个由元组组成的列表,每个元组的第一个值为订阅者的方法,第二个值为订阅者的返回值
In [25]: result
Out[25]: [(<function __main__.receive_data(sender, **kw)>, 'received!')]
匿名信号
信号可以是匿名的,可以使用Signal
类来创建唯一的信号(S
大写,这个类不像之前的signal
,为非单例模式)。
下面的on_ready
和on_complete
为两个不同的信号
In [28]: from blinker import Signal
In [29]: class AltProcessor(object):
...: on_ready = Signal()
...: on_complete = Signal()
...:
...: def __init__(self, name):
...: self.name = name
...:
...: def go(self):
...: self.on_ready.send(self)
...: print('Altername processing')
...: self.on_complete.send(self)
...:
...: def __repr__(self):
...: return '<AltProcessor {}>'.format(self.name)
通过装饰器来订阅
在订阅者接收发布者传递的数据中简单地演示了使用装饰器来订阅,但是那种订阅方式不支持订阅指定的发布者,这时候我们可以用.connect_via(sender)
In [31]: @dice_roll.connect_via(1)
...: @dice_roll.connect_via(3)
...: @dice_roll.connect_via(5)
...: def odd_subscriver(sender):
...: print('Observed dice roll {}'.format(sender))
...:
In [32]: result = dice_roll.send(3)
Observed dice roll 3
In [33]: result = dice_roll.send(1)
Observed dice roll 1
In [34]: result = dice_roll.send(5)
Observed dice roll 5
In [35]: result = dice_roll.send(2)
检查信号是否有订阅者
In [37]: bool(signal('ready').receivers)
Out[37]: True
In [38]: bool(signal('complete').receivers)
Out[38]: False
In [39]: bool(AltProcessor.on_complete.receivers)
Out[39]: False
In [40]: signal('ready').has_receivers_for(processor_a)
Out[40]: True