上篇文章:用Python操作nanomsg(二)——PipeLine讲了PipeLine模式下的应用并分析了其优缺点,显然像PipeLine这种必须两头都连接上才能完成一次消息传递的模式并不适用于大部分场景,如果做一个消息订阅器,难道消息发布者非得等到有人连接了才能继续下一条消息的发送?显然这是不可取的,为此本文将探讨更加符合消息订阅的一种通信模式——PubSub。
PubSub
PubSub,顾名思义就是Publish(发布)和Subscribe(订阅)。
在这个模式下一个Pub Node可以被多个Sub Node连接,Pub Node发布的消息是不阻塞的:
你Sub Node没在线的话没收到就没收到,它不管你
但是如果Sub Node在线,则一定不会错过
另外PubSub中的一个特性是,Sub Node是可以指定消息前缀来过滤要接收的信息,也就是”订阅“,默认这个消息前缀是"",此时默认接收所有消息。下面来看PubSub的基本用法。
PubSub基本用法
Pub Node的用法与PipeLine中的Push Node没有太大区别,只是在新建对象时候要指定protocol为nnpy.PUB
:
# 2 create a object
pub_server = nnpy.Socket(nnpy.AF_SP, nnpy.PUB)
# 3 establish a sever to push message
pub_server.bind('tcp://*:4000')
Sub Node的用法也与PipeLine中的Pull Node相差不大,除了指定protocol为nnpy.SUB
外,还需要进行一次订阅设置,使用setsockopt()
方法,level赋为nnpy.SUB
,option赋为nnpy.SUB_SUBSCRIBE
,value赋你想要的消息前缀:
# 2 create a object
sub_client = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
# subscribe message to receive by prefix, default is '' and receive all messages
sub_client.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, '')
# 3 establish a sever to push message
sub_client.connect('tcp://127.0.0.1:4000')
SUBSCRIBE对消息循环的影响
首先我们为了方便,先约定消息前缀的定义规范,就是要以英文冒号":"结束,然后Pub Node发布消息时也要在消息前缀和消息本体之间用英文冒号":"进行间隔:
假设消息前缀为roleA
,则Sub Node启动时,参数--subscribe
要设置为roleA:
;
而Pub Node发送消息你还好吗?时,则要在输入框中输入roleA:你还好吗?
。
OK,前面提到Sub Node通过指定消息前缀来接收消息,实际使用中发现:
在recv_data = sub_client.recv()
这行代码中就已经有了对消息前缀的判断,如果不符合消息前缀的比较是不会结束函数往下走的,如果往下走了那就是接受的消息已经符合了你的消息内容。
这就有意思了:
我们知道recv_data此刻还是二进制数据,我们并没有告诉nanomsg我们使用了何种编码方式转换成的二进制(这里用的是utf-8),但是sub_client.recv()
却能够在:
不知道编码格式的二进制数据 和 我们给的前缀字符串 之间进行内容判断,尚且不知道怎么做到的,反正在sub_client.recv()
一波神操作知晓了里面内容后,我们还是要乖乖对recv_data进行解码:
if recv_data:
decoded_data = recv_data.decode(config.DATA_ENCODING)
# 3 process received data
# remove the subscribe prefix
decoded_data = decoded_data[len(arguments.subscribe):]
因为已经确定了头部包含消息前缀,所以直接使用字符串截断即可分离出消息本体,后面对消息本体的操作就跟之前的一样了。
改进LAN-chat Program
将上述改动应用到前文中的LAN-chat Program,并完善config.py中的常量、变量后可以得到一个”消息订阅模式“的程序原型,我们在WSL中测试一下。
测试
在IDE中打开Terminal,输入bash
启动Linux子系统:
这里使用tmux进行终端复用,没有安装的话输入命令:
sudo apt-get install tmux
tmux(terminal multiplexer)是Linux上的终端复用神器,可从一个屏幕上管理多个终端(准确说是伪终端)。使用该工具,用户可以连接或断开会话,而保持终端在后台运行。类似的工具还有screen,个人对这二者的使用感受是,用过tmux就再也不想用screen了。(引用自Guanglin)
进来以后我们用快捷键Ctrl + Shift + '
将窗格最大化方便查看(再次按下Ctrl + Shift + '
时可恢复原大小),JetBrains大法好啊!
最大化后我们输入tmux
:
这时候tmux只有一个终端,我们可以通过快捷键将其复用成多个独立的终端窗格:
三个窗格分别设置为:
窗格号 | 角色 | 配置 | 说明 |
---|---|---|---|
0 | Publish Node | bind tcp *:4000 |
发布 |
1 | Subscribe Node | connect tcp 127.0.0.1:4000 --subscribe "roleA:" |
订阅"roleA:" |
2 | Subscribe Node | connect tcp 127.0.0.1:4000 --subscribe "roleB:" |
订阅"roleB:" |
接下来测试发送如下消息:
验证序号 | 内容 | 响应对象 | 验证内容 |
---|---|---|---|
1 | 你好,你们在吗? | 无 | 普通消息体 |
2 | roleA:你在哪里? | pannel 1 | 消息前缀消息体 |
3 | roleB:告诉我A的事情。 | pannel 2 | 消息前缀体 |
4 | roleA:client-offline-now | pannel 1 | 带消息前缀的下线指令 |
5 | roleB:client-offline-now | pannel 2 | 带消息前缀的下线指令 |
6 | server-exit-now | pannel 0 | 自下线指令 |
结果:正常,没有符合的消息前缀,故A、B都没有接收该消息。
结果:正常,相应消息前缀的消息都被订阅者正确接收,包括对订阅者和对发布者的指令也能正确执行。
完整实现
GitHub
(尚未上传)
Raw Files
chat.py
# _*_coding:utf-8 _*_
# @Time : 2020/2/5 18:41
# @Author : Shek
# @FileName: chat.py
# @Software: PyCharm
import argparse
from module.func import *
import config
parser = argparse.ArgumentParser(description=config.PROGRAM_DESCRIPTION)
subparsers = parser.add_subparsers()
# command 'bind'
cmd_bind = subparsers.add_parser('bind', help=config.H_BIND)
cmd_bind.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_BIND_PROTOCOL)
cmd_bind.add_argument('addr', action='store', nargs='?', default=config.BIND_ADDR, help=config.H_BIND_ADDR)
cmd_bind.set_defaults(func=sub_cmd_bind)
# command 'connect'
cmd_connect = subparsers.add_parser('connect', help=config.H_CONNECT)
cmd_connect.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_CONNECT_PROTOCOL)
cmd_connect.add_argument('addr', action='store', nargs='?', default=config.CONNECT_ADDR, help=config.H_CONNECT_ADDR)
cmd_connect.add_argument('--subscribe', action='store', nargs='?', default=config.CONNECT_SUBSCRIBE,
help=config.H_CONNECT_SUBSCRIBE)
cmd_connect.add_argument('--keep-alive', action='store_true', help=config.H_CONNECT_KEEP_ALIVE)
cmd_connect.set_defaults(func=sub_cmd_connect)
args = parser.parse_args() # 处理输入的参数
if not hasattr(args, 'func'):
# 无参数时跳转到-h,否则会提示 namespace object has not attribute 'func',故这里用hasattr()判断
args = parser.parse_args(['-h'])
args.func(args) # 跳转到对应的函数
config.py
# _*_coding:utf-8 _*_
# @Time : 2020/2/5 8:59
# @Author : Shek
# @FileName: config.py
# @Software: PyCharm
# universal configuration
DATA_ENCODING = 'utf-8'
PROTOCOL = 'tcp'
HOST_BIND = '*'
HOST_CONNECT = '127.0.0.1'
PORT = '4000'
CONNECT_ADDR = '{}:{}'.format(HOST_CONNECT, PORT)
BIND_ADDR = '{}:{}'.format(HOST_BIND, PORT)
CONNECT_SUBSCRIBE = ''
# program configuration
NN_MODE = 'PUB/SUB'
PROGRAM_URL = 'http://github.com/YourGithub/RepositoryAddress'
PROGRAM_DESCRIPTION = 'A {} LAN-chat program written in Python3 {}'.format(NN_MODE, PROGRAM_URL)
ROLE_NAME_PUSH = 'push'
ROLE_NAME_PULL = 'pull'
ROLE_NAME_PUB = 'pub'
ROLE_NAME_SUB = 'sub'
# info text
I_OP_SUCCESS = 'success'
I_OP_FAILED = 'failed'
I_KEEP_ALIVE_ENABLED = 'automatically reconnect enabled'
I_SUBSCRIBE_ALL = 'configured to receive all message'
I_SUBSCRIBE_SPECIFIC_PREFIX = 'configured to receive message start with:'
# help text
H_BIND = 'bind server'
H_BIND_PROTOCOL = 'communicate protocol'
H_BIND_ADDR = '<host>:<port>'
H_CONNECT = 'connect to a server'
H_CONNECT_PROTOCOL = 'communicate protocol'
H_CONNECT_ADDR = '<host>:<port>'
H_CONNECT_KEEP_ALIVE = 'automatically reconnect when corrupted'
H_CONNECT_SUBSCRIBE = 'subscribe the message that you are interested in'
# OneWayPipe DEFAULT CONFIGURATION
# count
COUNT_SEND_SUCCESS = 0
COUNT_SEND_FAILED = 0
# flag
FLAG_CLIENT_OFFLINE = 'client-offline-now'
FLAG_SERVER_EXIT = 'server-exit-now'
# log text
L_CLIENT_CTRL_C = 'closing...'
L_CLIENT_FLAG_OFFLINE_DETECTED = 'offline flag received from server, closing...'
L_CLIENT_CLOSED = 'client closed'
L_SERVER_EXIT = 'closing...'
L_SERVER_CLOSED = 'server closed'
L_SERVER_SEND_FAILED_PREFIX = 'SEND FAILED'
func.py
# _*_coding:utf-8 _*_
# @Time : 2020/2/6 11:42
# @Author : Shek
# @FileName: func.py
# @Software: PyCharm
import nnpy
import time
import datetime
import config
from module import logger
def current_datetime():
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def sub_cmd_bind(arguments):
# 1 initialize a logger
log = logger.Logger(config.ROLE_NAME_PUB)
# 2 create a object
pub_server = nnpy.Socket(nnpy.AF_SP, nnpy.PUB)
# 3 establish a sever to push message
log.info('binding to {}://{} ...'.format(arguments.protocol, arguments.addr))
result = pub_server.bind('{}://{}'.format(arguments.protocol, arguments.addr))
# bind status
log.info('success') if result else log.info('failed') and exit(0)
# 4 push loop
time.sleep(0.5)
while True:
content = input('Send({})>'.format(config.COUNT_SEND_SUCCESS))
# process input message/command
if content == config.FLAG_SERVER_EXIT:
# exit command caught, break loop
log.info(config.L_SERVER_EXIT)
break
else:
# send message/data/command
send_result = pub_server.send(bytes(content, encoding=config.DATA_ENCODING))
if send_result: # success
config.COUNT_SEND_SUCCESS += 1
else: # failed (warning: in push / pull mode will never reach here)
config.COUNT_SEND_FAILED += 1
log.warning('{}:{}'.format(config.L_SERVER_SEND_FAILED_PREFIX, content))
# 5 close server
pub_server.close()
log.info(config.L_SERVER_CLOSED)
def sub_cmd_connect(arguments):
# 1 initialize a logger
log = logger.Logger(config.ROLE_NAME_SUB)
# 2 create a object
sub_client = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
# subscribe message to receive by prefix, default is '' and receive all messages
sub_client.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, arguments.subscribe)
if arguments.subscribe == '':
log.info(config.I_SUBSCRIBE_ALL)
else:
log.info('{}{}'.format(config.I_SUBSCRIBE_SPECIFIC_PREFIX, str(arguments.subscribe)))
# keep-alive: not completed yet
if arguments.keep_alive:
log.info(config.I_KEEP_ALIVE_ENABLED)
# 3 connect to a server for receiving message
log.info('connecting to {}://{}'.format(arguments.protocol, arguments.addr))
result = sub_client.connect('{}://{}'.format(arguments.protocol, arguments.addr))
# connect status
log.info(config.I_OP_SUCCESS) if result else log.info(config.I_OP_FAILED) and exit(0)
# 4 receive in loop
time.sleep(0.5)
while True:
try:
recv_data = sub_client.recv()
if recv_data:
decoded_data = recv_data.decode(config.DATA_ENCODING)
# 3 process received data
# remove the subscribe prefix
decoded_data = decoded_data[len(arguments.subscribe):]
# receive a go-offline flag from server, break loop
if decoded_data == config.FLAG_CLIENT_OFFLINE:
log.info(config.L_CLIENT_FLAG_OFFLINE_DETECTED)
break
# display message push by server
print('{}|{}'.format(current_datetime(), decoded_data))
# logging to text file
log.debug(decoded_data)
except KeyboardInterrupt:
# ctrl + c detected
log.info(config.L_CLIENT_CTRL_C)
break
# 5 close client
sub_client.close()
log.info(config.L_CLIENT_CLOSED)
总结
PubSub的消息机制非常适用于消息订阅、消息发布之类的应用,虽然不能保证下线的用户收到所有消息,但因为其发布消息时不是阻塞的,可以有更大的余地通过其他手段弥补,例如建立缓存数据库向未接收消息的用户继续广播,也可以建立expire机制精确地投送订阅消息。
下一章我们将讨论nanomsg的Pair机制,本系列其他文章:
内容 | 文章地址 | 说明 |
---|---|---|
准备 | 用Python操作nanomsg(一)——准备 | 2020.2.7更新 |
PipeLine | 用Python操作nanomsg(二)——PipeLine | 2020.2.7更新 |
Pair | 用Python操作nanomsg(四)——Pair | 未开始 |
ReqRep | 用Python操作nanomsg(五)——ReqRep | 未开始 |
Survey | 用Python操作nanomsg(六)——Survey | 未开始 |
Bus | 用Python操作nanomsg(七)——Bus | 未开始 |