PipeLine基本用法
继用Python操作nanomsg(一)——准备之后,本文着重介绍nanomsg的PipeLine通信模式。
在PipeLine模式中,Socket Type为NN_PUSH的可以发送,Socket Type为NN_PULL的可以接收:
建立PUSH节点发送数据(Establish PUSH node to send data)
import nnpy
node0 = nnpy.Socket(nnpy.AF_SP, nnpy.PUSH)
node0.bind('tcp://*:4000')
# 发送数据
node0.send(bytes('你好', encoding='utf-8'))
建立PULL节点接收数据(Establish PULL node to receive data)
import nnpy
node1 = nnpy.Socket(nnpy.AF_SP, nnpy.PULL)
node1.connect('tcp://127.0.0.1:4000')
# 接收数据
recv_data = node1.recv()
data = recv_data.decode('utf-8')
注意的是发送和接收的数据都是二进制,发送前和接收后分别要进行编码和解码,这里使用utf-8作为统一格式。
没错就是这么简单,直接开始项目练手。
PipeLine练习项目
单向管道局域网聊天程序(PipeLine LAN-chat program)
这个LAN-chat program是一个命令行程序,通过子命令在PUSH或PULL模式下运行。
编写cli入口
首先明确我们的子命令和其参数,先简单一些,实现发送文字消息功能需要明确的内容为如下三部分:
子命令(sub-commands)
mode | sub-command | command | description |
---|---|---|---|
PUSH | bind | chat.py bind [protocol] [addr] |
run by server |
PULL | connect | chat.py connect [--keep-alive] [protocol] [addr] |
run by client |
主要由两个子命令bind和connect组成,分别启动push server和pull client两个不同角色。
子命令参数(arguments of sub-commands)
argument | default value | description |
---|---|---|
protocol | tcp | inproc/tcp/udp/ws |
addr | *:4000 for PUSH and 127.0.01:4000 for PULL | |
--keep-alive | False | (未完善) |
参数默认值定义在config.py
中,默认protocol=tcp
,addr=*:4000
(PULL模式为127.0.0.1:4000
)。
标志(flags)
flag variable name | default value | send by | react by | action |
---|---|---|---|---|
FLAG_CLIENT_OFFLINE | client-offline-now | PUSH | PULL | PULL节点结束进程 |
FLAG_SERVER_EXIT | server-exit-now | PUSH | PUSH | PUSH节点结束进程 |
以上flag作用于其被作为消息发送时可触发对应的动作。
关于包含子命令可参考前期文章:给Python脚本带上子命令(sub-commands),基于此可得到cli入口文件如下:
# _*_coding:utf-8 _*_
# @Time : 2020/2/5 18:41
# @Author : Shek
# @FileName: OneWayPipe_CLI.py
# @Software: PyCharm
import argparse
from module.func import *
import config
parser = argparse.ArgumentParser(description=config.PROGRAM_DESCRIPTION)
subparsers = parser.add_subparsers()
# command 'bind'
cmd_bind = subparsers.add_parser('bind', help=config.H_BIND)
cmd_bind.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_BIND_PROTOCOL)
cmd_bind.add_argument('addr', action='store', nargs='?', default=config.BIND_ADDR, help=config.H_BIND_ADDR)
cmd_bind.set_defaults(func=sub_cmd_bind)
# command 'connect'
cmd_connect = subparsers.add_parser('connect', help=config.H_CONNECT)
cmd_connect.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_CONNECT_PROTOCOL)
cmd_connect.add_argument('addr', action='store', nargs='?', default=config.CONNECT_ADDR, help=config.H_CONNECT_ADDR)
cmd_connect.add_argument('--keep-alive', action='store_true', help=config.H_CONNECT_KEEP_ALIVE)
cmd_connect.set_defaults(func=sub_cmd_connect)
args = parser.parse_args() # 处理输入的参数
if not hasattr(args, 'func'):
# 无参数时跳转到-h
#否则会提示 namespace object has not attribute 'func',故这里用hasattr()判断
args = parser.parse_args(['-h'])
args.func(args) # 跳转到对应的函数
sub_cmd_bind:服务端消息处理函数
1.创建日志写入对象,这是我整合的一个自用类,日志可同时写入到本地文件和输出到终端,用来替代printf():
# 1 initialize a logger
log = logger.Logger(config.LOG_NAME_PUSH)
2.创建nnpy.Socket对象,装入nnpy.PUSH到其参数protocol中,绑定本地地址tcp://*:4000:
# 2 create object
push_server = nnpy.Socket(nnpy.AF_SP, nnpy.PUSH)
# 3 establish a sever to push message
log.info('binding to {}://{} ...'.format(arguments.protocol, arguments.addr))
result = push_server.bind('{}://{}'.format(arguments.protocol, arguments.addr))
# bind status
log.info('success') if result else log.info('failed') and exit(0)
3.建立消息发送循环:
# 4 push loop
time.sleep(0.5)
while True:
content = input('Send>')
# send message/data/command
send_result = push_server.send(bytes(content, encoding=config.DATA_ENCODING))
# 5 close server
push_server.close()
同Socket一样,网络中传输过程中统一走的是二进制,故发送和接收需要进行编码和解码:
str转二进制:bin = bytes(content, encoding='utf-8')
二进制转str:content = bin.decode('utf-8')
但是这个循环是有问题的:这个While跳不出来,所以我们需要设置一个标志信息,当发送这个标志信息的时候退出服务端的消息循环,这是前面设置flags的目的。硬件设计中flag通常设置为整数,如0x77,这里方便起见直接使用字符串即可。
4.加入对flag的判断:
content = input('Send({})>'.format(config.COUNT_SEND_SUCCESS))
# process input message/command
if content == config.FLAG_SERVER_EXIT:
# exit command caught, break loop
log.info(config.L_SERVER_EXIT)
break
else:
# send message/data/command
...
5.消息发送次数统计:
# send message/data/command
send_result = push_server.send(bytes(content, encoding=config.DATA_ENCODING))
if send_result: # success
config.COUNT_SEND_SUCCESS += 1
else: # failed (warning: in push / pull mode will never reach here)
config.COUNT_SEND_FAILED += 1
log.warning('{}:{}'.format(config.L_SERVER_SEND_FAILED_PREFIX, content))
发送次数统计仅为美化,另外这里发送后成功的返回值是什么,所以这里的发送成功统计暂时没有意义,约等于发送次数统计。
6.完成:
sub_cmd_connect:编写客户端消息处理函数
1.同样先创建日志写入对象:
# 1 initialize a logger
log = logger.Logger(config.LOG_NAME_PULL)
2.创建nnpy.Socket对象,装入nnpy.PULL到其参数protocol中,连接地址tcp://127.0.0.1:4000(这里keep_alive参数暂时没有利用起来):
# 2 create object
pull_client = nnpy.Socket(nnpy.AF_SP, nnpy.PULL)
# not completed yet
if arguments.keep_alive:
print(config.I_KEEP_ALIVE_ENABLED)
# 3 connect to a server for receiving message
log.info('connecting to {}://{}'.format(arguments.protocol, arguments.addr))
result = pull_client.connect('{}://{}'.format(arguments.protocol, arguments.addr))
# connect status
log.info(config.I_OP_SUCCESS) if result else log.info(config.I_OP_FAILED) and exit(0)
4.创建消息接受循环,根据服务端的经验,同样加入跳出标志,一个是客户端本机按Ctrl + C时利用触发的KeyboardInterrupt异常跳出循环,另一个是接收服务端发送的FLAG_CLIENT_OFFLINE标志信息自行下线:
# 4 receive in loop
time.sleep(0.5)
while True:
try:
recv_data = pull_client.recv()
if recv_data:
decoded_data = recv_data.decode(config.DATA_ENCODING)
# 3 process received data
if decoded_data == config.FLAG_CLIENT_OFFLINE:
# receive a go-offline flag from server, break loop
log.info(config.L_CLIENT_FLAG_OFFLINE_DETECTED)
break
# display message push by server
print('{} {}'.format(current_datetime(), decoded_data))
# logging to text file
log.debug(decoded_data)
except KeyboardInterrupt:
# ctrl + c detected
log.info(config.L_CLIENT_CTRL_C)
break
# 5 close client
pull_client.close()
log.info(config.L_CLIENT_CLOSED)
5.完成:
测试运行
主要工作已完成,可能有眼细的同学看到上述代码中有很多config.开头的变量,这是为了方便日后维护,故把配置信息单独放在config.py中。
总结
上述虽然实现了基本的通信功能,但是很明显PipeLine模式不能满足“局域网聊天”的这个需要,首先它不支持双向通信,也不支持异步收发。在后续的模式测试中我们再对其进行改进。
本系列其他文章:
内容 | 文章地址 | 说明 |
---|---|---|
准备 | 用Python操作nanomsg(一)——准备 | 2020.2.7更新 |
PushPub | 用Python操作nanomsg(三)——PubSub | 2020.2.8更新 |
Pair | 用Python操作nanomsg(四)——Pair | 未开始 |
ReqRep | 用Python操作nanomsg(五)——ReqRep | 未开始 |
Survey | 用Python操作nanomsg(六)——Survey | 未开始 |
Bus | 用Python操作nanomsg(七)——Bus | 未开始 |