用Python操作nanomsg(三)——PubSub

上篇文章:用Python操作nanomsg(二)——PipeLine讲了PipeLine模式下的应用并分析了其优缺点,显然像PipeLine这种必须两头都连接上才能完成一次消息传递的模式并不适用于大部分场景,如果做一个消息订阅器,难道消息发布者非得等到有人连接了才能继续下一条消息的发送?显然这是不可取的,为此本文将探讨更加符合消息订阅的一种通信模式——PubSub。

PubSub

PubSub,顾名思义就是Publish(发布)和Subscribe(订阅)。

在这个模式下一个Pub Node可以被多个Sub Node连接,Pub Node发布的消息是不阻塞的:

你Sub Node没在线的话没收到就没收到,它不管你

但是如果Sub Node在线,则一定不会错过

另外PubSub中的一个特性是,Sub Node是可以指定消息前缀来过滤要接收的信息,也就是”订阅“,默认这个消息前缀是"",此时默认接收所有消息。下面来看PubSub的基本用法。

PubSub基本用法

Pub Node的用法与PipeLine中的Push Node没有太大区别,只是在新建对象时候要指定protocol为nnpy.PUB

# 2 create a object
pub_server = nnpy.Socket(nnpy.AF_SP, nnpy.PUB)
# 3 establish a sever to push message
pub_server.bind('tcp://*:4000')

Sub Node的用法也与PipeLine中的Pull Node相差不大,除了指定protocol为nnpy.SUB外,还需要进行一次订阅设置,使用setsockopt()方法,level赋为nnpy.SUB,option赋为nnpy.SUB_SUBSCRIBE,value赋你想要的消息前缀:

# 2 create a object
sub_client = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
# subscribe message to receive by prefix, default is '' and receive all messages
sub_client.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, '')
# 3 establish a sever to push message
sub_client.connect('tcp://127.0.0.1:4000')

SUBSCRIBE对消息循环的影响

首先我们为了方便,先约定消息前缀的定义规范,就是要以英文冒号":"结束,然后Pub Node发布消息时也要在消息前缀和消息本体之间用英文冒号":"进行间隔:

假设消息前缀为roleA,则Sub Node启动时,参数--subscribe要设置为roleA:

而Pub Node发送消息你还好吗?时,则要在输入框中输入roleA:你还好吗?

OK,前面提到Sub Node通过指定消息前缀来接收消息,实际使用中发现:

recv_data = sub_client.recv()这行代码中就已经有了对消息前缀的判断,如果不符合消息前缀的比较是不会结束函数往下走的,如果往下走了那就是接受的消息已经符合了你的消息内容。

这就有意思了:

我们知道recv_data此刻还是二进制数据,我们并没有告诉nanomsg我们使用了何种编码方式转换成的二进制(这里用的是utf-8),但是sub_client.recv()却能够在:

不知道编码格式的二进制数据我们给的前缀字符串 之间进行内容判断,尚且不知道怎么做到的,反正在sub_client.recv()一波神操作知晓了里面内容后,我们还是要乖乖对recv_data进行解码:

if recv_data:
    decoded_data = recv_data.decode(config.DATA_ENCODING)
    # 3 process received data
    # remove the subscribe prefix
    decoded_data = decoded_data[len(arguments.subscribe):]

因为已经确定了头部包含消息前缀,所以直接使用字符串截断即可分离出消息本体,后面对消息本体的操作就跟之前的一样了。

改进LAN-chat Program

将上述改动应用到前文中的LAN-chat Program,并完善config.py中的常量、变量后可以得到一个”消息订阅模式“的程序原型,我们在WSL中测试一下。

测试

在IDE中打开Terminal,输入bash启动Linux子系统:

启动WSL

这里使用tmux进行终端复用,没有安装的话输入命令:

sudo apt-get install tmux

tmux(terminal multiplexer)是Linux上的终端复用神器,可从一个屏幕上管理多个终端(准确说是伪终端)。使用该工具,用户可以连接或断开会话,而保持终端在后台运行。类似的工具还有screen,个人对这二者的使用感受是,用过tmux就再也不想用screen了。(引用自Guanglin

进来以后我们用快捷键Ctrl + Shift + '将窗格最大化方便查看(再次按下Ctrl + Shift + '时可恢复原大小),JetBrains大法好啊!

最大化后我们输入tmux

输入tmux进入复用界面

这时候tmux只有一个终端,我们可以通过快捷键将其复用成多个独立的终端窗格:


分成了0、1、2三个终端

三个窗格分别设置为:

窗格号 角色 配置 说明
0 Publish Node bind tcp *:4000 发布
1 Subscribe Node connect tcp 127.0.0.1:4000 --subscribe "roleA:" 订阅"roleA:"
2 Subscribe Node connect tcp 127.0.0.1:4000 --subscribe "roleB:" 订阅"roleB:"
分别启动1个pub node和2个sub node

接下来测试发送如下消息:

验证序号 内容 响应对象 验证内容
1 你好,你们在吗? 普通消息体
2 roleA:你在哪里? pannel 1 消息前缀消息体
3 roleB:告诉我A的事情。 pannel 2 消息前缀体
4 roleA:client-offline-now pannel 1 带消息前缀的下线指令
5 roleB:client-offline-now pannel 2 带消息前缀的下线指令
6 server-exit-now pannel 0 自下线指令
验证内容1

结果:正常,没有符合的消息前缀,故A、B都没有接收该消息。


验证内容2、3、4、5、6

结果:正常,相应消息前缀的消息都被订阅者正确接收,包括对订阅者和对发布者的指令也能正确执行。

完整实现

GitHub

(尚未上传)

Raw Files

以下基于3327f4cb时刻本地git 版本“PubSub v0.1.0 releases”

chat.py

# _*_coding:utf-8 _*_
# @Time    : 2020/2/5 18:41
# @Author  : Shek 
# @FileName: chat.py
# @Software: PyCharm
import argparse
from module.func import *
import config

parser = argparse.ArgumentParser(description=config.PROGRAM_DESCRIPTION)
subparsers = parser.add_subparsers()

# command 'bind'
cmd_bind = subparsers.add_parser('bind', help=config.H_BIND)
cmd_bind.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_BIND_PROTOCOL)
cmd_bind.add_argument('addr', action='store', nargs='?', default=config.BIND_ADDR, help=config.H_BIND_ADDR)
cmd_bind.set_defaults(func=sub_cmd_bind)

# command 'connect'
cmd_connect = subparsers.add_parser('connect', help=config.H_CONNECT)
cmd_connect.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_CONNECT_PROTOCOL)
cmd_connect.add_argument('addr', action='store', nargs='?', default=config.CONNECT_ADDR, help=config.H_CONNECT_ADDR)
cmd_connect.add_argument('--subscribe', action='store', nargs='?', default=config.CONNECT_SUBSCRIBE,
                         help=config.H_CONNECT_SUBSCRIBE)
cmd_connect.add_argument('--keep-alive', action='store_true', help=config.H_CONNECT_KEEP_ALIVE)
cmd_connect.set_defaults(func=sub_cmd_connect)

args = parser.parse_args()  # 处理输入的参数
if not hasattr(args, 'func'):
    # 无参数时跳转到-h,否则会提示 namespace object has not attribute 'func',故这里用hasattr()判断
    args = parser.parse_args(['-h'])
args.func(args)  # 跳转到对应的函数

config.py

# _*_coding:utf-8 _*_
# @Time    : 2020/2/5 8:59
# @Author  : Shek 
# @FileName: config.py
# @Software: PyCharm

# universal configuration
DATA_ENCODING = 'utf-8'
PROTOCOL = 'tcp'
HOST_BIND = '*'
HOST_CONNECT = '127.0.0.1'
PORT = '4000'
CONNECT_ADDR = '{}:{}'.format(HOST_CONNECT, PORT)
BIND_ADDR = '{}:{}'.format(HOST_BIND, PORT)
CONNECT_SUBSCRIBE = ''
# program configuration
NN_MODE = 'PUB/SUB'
PROGRAM_URL = 'http://github.com/YourGithub/RepositoryAddress'
PROGRAM_DESCRIPTION = 'A {} LAN-chat program written in Python3 {}'.format(NN_MODE, PROGRAM_URL)
ROLE_NAME_PUSH = 'push'
ROLE_NAME_PULL = 'pull'
ROLE_NAME_PUB = 'pub'
ROLE_NAME_SUB = 'sub'
# info text
I_OP_SUCCESS = 'success'
I_OP_FAILED = 'failed'
I_KEEP_ALIVE_ENABLED = 'automatically reconnect enabled'

I_SUBSCRIBE_ALL = 'configured to receive all message'
I_SUBSCRIBE_SPECIFIC_PREFIX = 'configured to receive message start with:'
# help text
H_BIND = 'bind server'
H_BIND_PROTOCOL = 'communicate protocol'
H_BIND_ADDR = '<host>:<port>'
H_CONNECT = 'connect to a server'
H_CONNECT_PROTOCOL = 'communicate protocol'
H_CONNECT_ADDR = '<host>:<port>'
H_CONNECT_KEEP_ALIVE = 'automatically reconnect when corrupted'
H_CONNECT_SUBSCRIBE = 'subscribe the message that you are interested in'
# OneWayPipe DEFAULT CONFIGURATION
# count
COUNT_SEND_SUCCESS = 0
COUNT_SEND_FAILED = 0
# flag
FLAG_CLIENT_OFFLINE = 'client-offline-now'
FLAG_SERVER_EXIT = 'server-exit-now'
# log text
L_CLIENT_CTRL_C = 'closing...'
L_CLIENT_FLAG_OFFLINE_DETECTED = 'offline flag received from server, closing...'
L_CLIENT_CLOSED = 'client closed'

L_SERVER_EXIT = 'closing...'
L_SERVER_CLOSED = 'server closed'
L_SERVER_SEND_FAILED_PREFIX = 'SEND FAILED'

func.py

# _*_coding:utf-8 _*_
# @Time    : 2020/2/6 11:42
# @Author  : Shek 
# @FileName: func.py
# @Software: PyCharm
import nnpy
import time
import datetime
import config
from module import logger


def current_datetime():
    return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')


def sub_cmd_bind(arguments):
    # 1 initialize a logger
    log = logger.Logger(config.ROLE_NAME_PUB)
    # 2 create a object
    pub_server = nnpy.Socket(nnpy.AF_SP, nnpy.PUB)
    # 3 establish a sever to push message
    log.info('binding to {}://{} ...'.format(arguments.protocol, arguments.addr))
    result = pub_server.bind('{}://{}'.format(arguments.protocol, arguments.addr))
    # bind status
    log.info('success') if result else log.info('failed') and exit(0)

    # 4 push loop
    time.sleep(0.5)
    while True:
        content = input('Send({})>'.format(config.COUNT_SEND_SUCCESS))
        # process input message/command
        if content == config.FLAG_SERVER_EXIT:
            # exit command caught, break loop
            log.info(config.L_SERVER_EXIT)
            break
        else:
            # send message/data/command
            send_result = pub_server.send(bytes(content, encoding=config.DATA_ENCODING))
            if send_result:  # success
                config.COUNT_SEND_SUCCESS += 1
            else:  # failed (warning: in push / pull mode will never reach here)
                config.COUNT_SEND_FAILED += 1
                log.warning('{}:{}'.format(config.L_SERVER_SEND_FAILED_PREFIX, content))
    # 5 close server
    pub_server.close()
    log.info(config.L_SERVER_CLOSED)


def sub_cmd_connect(arguments):
    # 1 initialize a logger
    log = logger.Logger(config.ROLE_NAME_SUB)
    # 2 create a object
    sub_client = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
    # subscribe message to receive by prefix, default is '' and receive all messages
    sub_client.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, arguments.subscribe)
    if arguments.subscribe == '':
        log.info(config.I_SUBSCRIBE_ALL)
    else:
        log.info('{}{}'.format(config.I_SUBSCRIBE_SPECIFIC_PREFIX, str(arguments.subscribe)))
    # keep-alive: not completed yet
    if arguments.keep_alive:
        log.info(config.I_KEEP_ALIVE_ENABLED)
    # 3 connect to a server for receiving message
    log.info('connecting to {}://{}'.format(arguments.protocol, arguments.addr))
    result = sub_client.connect('{}://{}'.format(arguments.protocol, arguments.addr))
    # connect status
    log.info(config.I_OP_SUCCESS) if result else log.info(config.I_OP_FAILED) and exit(0)

    # 4 receive in loop
    time.sleep(0.5)
    while True:
        try:
            recv_data = sub_client.recv()
            if recv_data:
                decoded_data = recv_data.decode(config.DATA_ENCODING)
                # 3 process received data
                # remove the subscribe prefix
                decoded_data = decoded_data[len(arguments.subscribe):]
                # receive a go-offline flag from server, break loop
                if decoded_data == config.FLAG_CLIENT_OFFLINE:
                    log.info(config.L_CLIENT_FLAG_OFFLINE_DETECTED)
                    break
                # display message push by server
                print('{}|{}'.format(current_datetime(), decoded_data))
                # logging to text file
                log.debug(decoded_data)
        except KeyboardInterrupt:
            # ctrl + c detected
            log.info(config.L_CLIENT_CTRL_C)
            break

    # 5 close client
    sub_client.close()
    log.info(config.L_CLIENT_CLOSED)

总结

PubSub的消息机制非常适用于消息订阅、消息发布之类的应用,虽然不能保证下线的用户收到所有消息,但因为其发布消息时不是阻塞的,可以有更大的余地通过其他手段弥补,例如建立缓存数据库向未接收消息的用户继续广播,也可以建立expire机制精确地投送订阅消息。

下一章我们将讨论nanomsg的Pair机制,本系列其他文章:

内容 文章地址 说明
准备 用Python操作nanomsg(一)——准备 2020.2.7更新
PipeLine 用Python操作nanomsg(二)——PipeLine 2020.2.7更新
Pair 用Python操作nanomsg(四)——Pair 未开始
ReqRep 用Python操作nanomsg(五)——ReqRep 未开始
Survey 用Python操作nanomsg(六)——Survey 未开始
Bus 用Python操作nanomsg(七)——Bus 未开始
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容