同步socket, 异步socket, 多线程socket, 多进程socket

开篇

socket常用,本文立足同步和异步socket,以及现有的socketserver库。

同步socket一般有利用socket库直接,就可以写出tcp或udp的套接字

socketserver提供的线程或进程方式的socket

利用python 3.5+的asyncio协议,封装一个协程的socket server ,普通的socket客户也可以连接。

Tcp 套接字

服务器端

from socket import *
phone=socket(AF_INET,SOCK_STREAM)        #套接字
phone.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)   #解决端口占用
phone.bind(('127.0.0.1',8080))   #绑定端口和Ip到套接字
phone.listen(5)
conn,client_addr=phone.accept()   #等待tcp接受
# data1=conn.recv(10)
# print('data1: ',data1)
# data2=conn.recv(4)
# print('data2:',data2)
#接收方不及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,
#服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)

客户端

from socket import *
import time
phone=socket(AF_INET,SOCK_STREAM)
phone.connect(('127.0.0.1',8080))
# phone.send('helloworld'.encode('utf-8'))
# phone.send('egon'.encode('utf-8'))
#发送端需要等缓冲区满才发送出去,造成粘包(发送数据时间间隔很短,数据了很小,会合到一起,产生粘包)

用struct模块解决粘包问题

为字节流加上自定义固定长度报头,报头中包含字节流长度,然后一次send到对端,对端在接收时,先从缓存中取出定长的报头,然后再取真实数据

构造报头信息

#_*_coding:utf-8_*_

import struct
import binascii
import ctypes

values1 = (1, 'abc'.encode('utf-8'), 2.7)
values2 = ('defg'.encode('utf-8'),101)
s1 = struct.Struct('I3sf')
s2 = struct.Struct('4sI')

print(s1.size,s2.size)
prebuffer=ctypes.create_string_buffer(s1.size+s2.size)
print('Before : ',binascii.hexlify(prebuffer))
# t=binascii.hexlify('asdfaf'.encode('utf-8'))
# print(t)


s1.pack_into(prebuffer,0,*values1)
s2.pack_into(prebuffer,s1.size,*values2)

print('After pack',binascii.hexlify(prebuffer))
print(s1.unpack_from(prebuffer,0))
print(s2.unpack_from(prebuffer,s1.size))

s3=struct.Struct('ii')
s3.pack_into(prebuffer,0,123,123)
print('After pack',binascii.hexlify(prebuffer))
print(s3.unpack_from(prebuffer,0))

关于struct的详细用法

服务端

import socket
import subprocess
import struct
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #买手机
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加
phone.bind(('127.0.0.1',8088)) #绑定手机卡
phone.listen(5) #开机

print('starting...')
while True: #链接循环
    conn,client_addr=phone.accept() #等电话 (链接,客户的的ip和端口组成的元组)
    print('-------->',conn,client_addr)

    #收,发消息
    while True:#通信循环
        try:
            cmd=conn.recv(1024)
            print(cmd)
            if not cmd:break #针对linux
            #执行cmd命令,拿到cmd的结果,结果应该是bytes类型
            #。。。。
            res = subprocess.Popen(cmd.decode('utf-8'), shell=True,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
            stdout=res.stdout.read()
            stderr=res.stderr.read()
            print(stdout)

            #先发报头(转成固定长度的bytes类型)
            header = struct.pack('i',len(stdout)+len(stderr))
            print(header)
            conn.send(header)
            #再发送命令的结果
            conn.send(stdout)
            conn.send(stderr)
        except Exception:
            break
    conn.close() #挂电话
phone.close() #关机

客户端

import socket
import struct
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #买手机
phone.connect(('127.0.0.1',8088)) #绑定手机卡

#发,收消息
while True:
    cmd=input('>>: ').strip()
    if not cmd:continue

    phone.send(cmd.encode('utf-8'))
    #先收报头
    header_struct=phone.recv(4)
    print(header_struct)
    unpack_res = struct.unpack('i', header_struct)
    total_size=unpack_res[0]
    print(total_size)

    #再收数据
    recv_size=0 #10241=10240+1
    total_data=b''
    while recv_size < total_size:
        recv_data=phone.recv(1024)
        print(recv_data)
        recv_size+=len(recv_data)
        print(recv_size)
        total_data+=recv_data
        print(total_data)
    # else:
    print(total_data.decode('gbk'))
phone.close()

大文件粘包问题

粘包,分包都tcp
tcp为什么会有粘包分包这些情况:
1.服务端处理不过来
2.客户端采用优化纳格尔算法,达到一定字节才发
怎么处理:
1. 客,服双方确定包头规范,根据包头的信息取包长度
2. 客户端发送带上标记位,如\n, 服务端根据标记取包

服务器端

import socket
import subprocess
import struct
import json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #买手机
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加
phone.bind(('127.0.0.1',8082)) #绑定手机卡
phone.listen(5) #开机

print('starting...')
while True: #链接循环
    conn,client_addr=phone.accept() #等电话 (链接,客户的的ip和端口组成的元组)
    print('-------->',conn,client_addr)

    #收,发消息
    while True:#通信循环
        try:
            cmd=conn.recv(1024)
            if not cmd:break #针对linux
            #执行cmd命令,拿到cmd的结果,结果应该是bytes类型
            #。。。。
            res = subprocess.Popen(cmd.decode('utf-8'), shell=True,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
            stdout=res.stdout.read()
            stderr=res.stderr.read()
            #制作报头
            header_dic = {
                'total_size': len(stdout)+len(stderr),
                'filename': None,
                'md5': None}

            header_json = json.dumps(header_dic)
            header_bytes = header_json.encode('utf-8')
            #发送阶段
            #先发报头长度
            conn.send(struct.pack('i',len(header_bytes)))
            #再发报头
            conn.send(header_bytes)

            #最后发送命令的结果
            conn.send(stdout)
            conn.send(stderr)
        except Exception:
            break
    conn.close() #挂电话
phone.close() #关机

客户端

import socket
import struct
import json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #买手机
phone.connect(('127.0.0.1',8082)) #绑定手机卡

#发,收消息
while True:
    cmd=input('>>: ').strip()
    if not cmd:continue

    phone.send(cmd.encode('utf-8'))
    #先收报头的长度
    header_len=struct.unpack('i',phone.recv(4))[0]

    #再收报头
    header_bytes=phone.recv(header_len)
    header_json=header_bytes.decode('utf-8')
    header_dic=json.loads(header_json)
    total_size=header_dic['total_size']

    #最后收数据
    recv_size=0 #10241=10240+1
    total_data=b''
    while recv_size < total_size:
        recv_data=phone.recv(1024)
        recv_size+=len(recv_data)
        total_data+=recv_data
    print(total_data.decode('gbk'))
phone.close()

udp套接字

服务器端

from socket import *
udp_server=socket(AF_INET,SOCK_DGRAM)
udp_server.bind(('127.0.0.1',8088))

while True:
    msg,client_addr=udp_server.recvfrom(1024)
    print('has recv %s' %msg)
    udp_server.sendto(msg.upper(),client_addr)
    print('has send')
udp_server.close()

客户端

from socket import *
udp_client=socket(AF_INET,SOCK_DGRAM)

while True:
    msg=input('>>: ').strip()
    udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8088))
    print('has send')
    # res,server_addr=udp_client.recvfrom(1024)
    # print('====>',res.decode('utf-8'))

udp_client.close()

udp 套接字不会发生粘包

服务器端

from socket import *
udp_server=socket(AF_INET,SOCK_DGRAM)
udp_server.bind(('127.0.0.1',8089))

msg1,client_addr=udp_server.recvfrom(5)
print(msg1)

msg2,client_addr=udp_server.recvfrom(5)
print(msg2)

客户端

from socket import *
      udp_client=socket(AF_INET,SOCK_DGRAM)
      udp_client.sendto('hello'.encode('utf-8'),('127.0.0.1',8089))
     udp_client.sendto('world'.encode('utf-8'),('127.0.0.1',8089))

socketserver

封装了socket,而且解决了Io阻塞问题

服务端

# socketserver模块多进程,多线程
# 无论你开什么都是开线程,线程就有IO,这个模块帮你解决这个IO问题

# 基础版本,遇到问题,不能无限开线程,而且没有解决IO
# 线程开启跟进程开启一样
from socket import *
from threading import Thread
# s=socket(AF_INET,SOCK_STREAM)
# s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
# s.bind(('127.0.0.1',8090))
# s.listen(5)
# def talk(conn,addr):
#     while True: #通信循环
#         try:
#             data=conn.recv(1024)
#             if not data:break
#             conn.send(data.upper())
#         except Exception:
#             break
#     conn.close()
# if __name__ == '__main__':
#     while True:#链接循环
#         conn,addr=s.accept()
#         p = Thread(target=talk,args=(conn,addr))
#         p.start()
#     s.close()

# socketserver模块套接字,自动处理IO问题
import socketserver
class MyTCPhandler(socketserver.BaseRequestHandler):  #必须继承这个类
    def handle(self):
        # print(self.request)  打印出来的就是conn
        # print(self.client_address)   打印出来的就是addr
        while True:
            try:
                data=self.request.recv(1024)
                if not data:break
                self.request.send(data.upper())
            except Exception:
                break
        self.request.close()
if __name__ == '__main__':
    server=socketserver.ThreadingTCPServer(('127.0.0.1',8082),MyTCPhandler)  #多线程
    # 三个参数,IP,端口,类
    # server=socketserver.ForkingTCPServer(('127.0.0.1',8082),MyTCPhandler)  #多进程
    server.allow_reuse_address=True   #重用地址
    server.serve_forever()   #永远运行,就是一直开着,相当于while True

客户端

from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8082))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode("utf-8"))

高度封装,解决阻塞

# coding: utf8
__version__ = "0.4"
# get verify process

import socket
import select
import sys
import os
import errno
try:
    import threading
except ImportError:
    import dummy_threading as threading

__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
           "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
           "StreamRequestHandler","DatagramRequestHandler",
           "ThreadingMixIn", "ForkingMixIn"]
if hasattr(socket, "AF_UNIX"):
    __all__.extend(["UnixStreamServer","UnixDatagramServer",
                    "ThreadingUnixStreamServer",
                    "ThreadingUnixDatagramServer"])


def _eintr_retry(func, *args):
    """
    捕获系统中断信号并继续

    :param func:
    :param args:
    :return:
    """
    while True:
        try:
            return func(*args)
        except (OSError, select.error) as e:
            if e.args[0] != errno.EINTR:
                raise


class BaseServer:

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):

        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()  # 这个事件只是再服务器停止后通知调用者
        self.__shutdown_request = False

    def server_activate(self):
        pass

    # 启动服务器的两种选择
    # 1、一次调用一次handle_request
    # 2、直接调用serve_forver

    def serve_forever(self, poll_interval=0.5):
        self.__is_shut_down.clear()  # 确保该事件初始是未被通知的
        try:
            while not self.__shutdown_request:  # 循环判断是否要关闭服务器了

                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
    def shutdown(self):
        self.__shutdown_request = True  # 设置关闭标记
        self.__is_shut_down.wait()  # 等待关闭后回发的事件
    def handle_request(self):

        # 获取timeout
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)

        # 只select一次
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            # 超时处理
            self.handle_timeout()
            return
        self._handle_request_noblock()
    def _handle_request_noblock(self):
        """
        这边是在select返回后调用的,所以直接获取数据是不会阻塞的

        :return:
        """

        try:
            request, client_address = self.get_request()
        except socket.error:
            return

        # 校验verify、处理process、(捕获异常并关闭)
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        pass
    def verify_request(self, request, client_address):
        return True
    def process_request(self, request, client_address):
        """
        默认的处理流程:finish shutdown

        :param request:
        :param client_address:
        :return:
        """
        self.finish_request(request, client_address)  # 在finish中只是构造那个request handler即可
        self.shutdown_request(request)

    def server_close(self):
        pass

    def finish_request(self, request, client_address):
        self.RequestHandlerClass(request, client_address, self)
    def shutdown_request(self, request):
        self.close_request(request)
    def close_request(self, request):
        pass

    def handle_error(self, request, client_address):
        print '-'*40
        print 'Exception happened during processing of request from',
        print client_address
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print '-'*40


class TCPServer(BaseServer):

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False  # ?

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):

        BaseServer.__init__(self, server_address, RequestHandlerClass)

        # 这边就是在baseserver中select的self中的东西
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()  # 绑定 bind
                self.server_activate()  # 激活 listen
            except:
                self.server_close() # close
                raise

    def server_bind(self):
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()
    def server_activate(self):
        self.socket.listen(self.request_queue_size)
    def server_close(self):
        self.socket.close()
    def fileno(self):
        return self.socket.fileno()

    def get_request(self):
        """
        该函数在基类中每次select到IO后调用 所以可以无阻塞获得一个连接

        :return:
        """
        return self.socket.accept()

    def shutdown_request(self, request):
        # socket.shutdown
        try:
            request.shutdown(socket.SHUT_WR)
        except socket.error:
            pass
        self.close_request(request)
    def close_request(self, request):
        # socket.close
        request.close()


class UDPServer(TCPServer):

    allow_reuse_address = False  # ?

    socket_type = socket.SOCK_DGRAM

    max_packet_size = 8192

    def get_request(self):
        data, client_addr = self.socket.recvfrom(self.max_packet_size)
        return (data, self.socket), client_addr

    def server_activate(self):
        """
        udp没有listen可调

        :return:
        """
        pass

    def shutdown_request(self, request):
        """
        没什么shutdown可调

        :param request:
        :return:
        """
        self.close_request(request)

    def close_request(self, request):
        """
        也不用关闭一个请求

        :param request:
        :return:
        """
        pass


# 下述的mixin改变的是基类中关于process request的改写,也就是获取到一个请求后处理时

class ForkingMixIn:

    timeout = 300
    active_children = None
    max_children = 40

    def collect_children(self):
        """Internal routine to wait for children that have exited."""
        if self.active_children is None:
            return

        # If we're above the max number of children, wait and reap them until
        # we go back below threshold. Note that we use waitpid(-1) below to be
        # able to collect children in size(<defunct children>) syscalls instead
        # of size(<children>): the downside is that this might reap children
        # which we didn't spawn, which is why we only resort to this when we're
        # above max_children.
        while len(self.active_children) >= self.max_children:
            try:
                pid, _ = os.waitpid(-1, 0)
                self.active_children.discard(pid)
            except OSError as e:
                if e.errno == errno.ECHILD:
                    # we don't have any children, we're done
                    self.active_children.clear()
                elif e.errno != errno.EINTR:
                    break

        # Now reap all defunct children.
        for pid in self.active_children.copy():
            try:
                pid, _ = os.waitpid(pid, os.WNOHANG)
                # if the child hasn't exited yet, pid will be 0 and ignored by
                # discard() below
                self.active_children.discard(pid)
            except OSError as e:
                if e.errno == errno.ECHILD:
                    # someone else reaped it
                    self.active_children.discard(pid)

    def handle_timeout(self):
        self.collect_children()

    def process_request(self, request, client_address):

        self.collect_children()
        pid = os.fork()
        if pid:
            # 父进程
            if self.active_children is None:
                self.active_children = set()
            self.active_children.add(pid)  # 加入到父进程的活跃子进程集合中
            self.close_request(request)  # 父进程无需再管这个请求了
            return
        else:
            # 子进程
            try:
                #
                self.finish_request(request, client_address)
                self.shutdown_request(request)
                os._exit(0)
            except:
                try:
                    self.handle_error(request, client_address)
                    self.shutdown_request(request)
                finally:
                    os._exit(1)


class ThreadingMixIn:

    daemon_threads = False

    def process_request_thread(self, request, client_address):

        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):

        t = threading.Thread(target=self.process_request_thread, args=(request, client_address))
        t.daemon = self.daemon_threads
        t.start()


class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass

class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

if hasattr(socket, 'AF_UNIX'):

    class UnixStreamServer(TCPServer):
        address_family = socket.AF_UNIX

    class UnixDatagramServer(UDPServer):
        address_family = socket.AF_UNIX

    class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass

    class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass


# handler中流程:init setup handle finish
# 获得的参数:request client_address server
class BaseRequestHandler:

    def __init__(self, request, client_address, server):

        self.request = request
        self.client_address = client_address
        self.server = server

        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass


class StreamRequestHandler(BaseRequestHandler):

    rbufsize = -1
    wbufsize = 0
    timeout = None
    disable_nagle_algorithm = False

    def setup(self):
        """
        连接来的socket

        :return:
        """
        self.connection = self.request

        if self.timeout is not None:
            self.connection.settimeout(self.timeout)
        if self.disable_nagle_algorithm:
            self.connection.setsockopt(socket.IPPROTO_TCP,
                                       socket.TCP_NODELAY, True)

        # socket.makefile
        self.rfile = self.connection.makefile('rb', self.rbufsize)
        self.wfile = self.connection.makefile('wb', self.wbufsize)

    def finish(self):
        """
        结束连接

        :return:
        """
        if not self.wfile.closed:
            try:
                self.wfile.flush()
            except socket.error:
                # An final socket error may have occurred here, such as
                # the local error ECONNABORTED.
                pass
        self.wfile.close()
        self.rfile.close()


class DatagramRequestHandler(BaseRequestHandler):

    def setup(self):
        try:
            from cStringIO import StringIO
        except ImportError:
            from StringIO import StringIO
        self.packet, self.socket = self.request
        self.rfile = StringIO(self.packet)
        self.wfile = StringIO()

    def finish(self):
        self.socket.sendto(self.wfile.getvalue(), self.client_address)

asyncio server

import sys
import loguru
import asyncio
import logging
import uvloop

# 日志基本配置
logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stdout,
)
log = logging.getLogger('main')

# 服务基本信息
SERVER_ADDRESS = ('127.0.0.1', 10030)

"""
启动uvloop例例有三种方式
"""

# 使用原生asyncio实例
# event_loop = asyncio.get_event_loop()

# 使用uvloop策略
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
#event_loop = asyncio.get_event_loop()

# 创建uvloop实例
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
event_loop = asyncio.get_event_loop()


class EchoServer(asyncio.Protocol):

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log =logging.getLogger('EchoServer_{}_{}'.format(*self.address))
        self.log.info("connection accepted")

    def data_received(self, data):
        self.log.info('received {!r}'.format(data))
        print(len(data))
        # self.transport.write(data)
        # 回复客户端
        self.transport.write(b"Received success")
        self.log.info('sent {!r}'.format("Received success"))

    def eof_received(self):
        self.log.info("received EOF")
        if self.transport.can_write_eof():
            self.transport.write_eof()

    def connection_lost(self, error):
        if error:
            self.log.error("ERROR: {}".format(error))
        else:
            self.log.debug("closing")
        super().connection_lost(error)


if __name__ == "__main__":
    try:
        factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)

        server = event_loop.run_until_complete(factory)

        log.info('starting up tcp on {} port {}'.format(*SERVER_ADDRESS))
        event_loop.run_forever()
    finally:
        server.close()
        event_loop.run_until_complete(server.wait_closed())
        event_loop.close()
        log.info("closed event loop")

客户端

import asyncio
from asyncio import AbstractEventLoop

import loguru
import sys
import logging
import functools


MESSAGES = [
    b'This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message .This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message .This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message .This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message .This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the messageThis is the message.This is the message.This is the message.This is the message',
    b'It will be sent ',

]

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

SERVER_ADDRESS = ('127.0.0.1', 10030)

event_loop: AbstractEventLoop = asyncio.get_event_loop()


class EchoClient(asyncio.Protocol):

    def __init__(self, messages, future):
        super().__init__()
        self.messages = messages
        self.log = logging.getLogger("EchoClient")
        self.f = future

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info("peername")
        self.log.info('connectiong to {} port {}'.format(*self.address))
        for msg in self.messages:
            transport.write(msg)
            self.log.debug("sending {!r}".format(msg))
        if transport.can_write_eof():
            transport.write_eof()

    def data_received(self, data):
        # self.log.debug("received {!r}".format(data))
        print("received {!r}".format(data))

    def eof_received(self):
        self.log.debug("receive")
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)

    def connction_lost(self, exec):
        self.log.debug("server closed connection")
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)
        super().connection_lost(exec)


client_completed = asyncio.Future()

client_factory = functools.partial(
    EchoClient,
    messages=MESSAGES,
    future=client_completed
)

factory_coroutine = event_loop.create_connection(
    client_factory,
    *SERVER_ADDRESS,
)


log.info('waiting for client to complete')

try:
    event_loop.run_until_complete(factory_coroutine)
    event_loop.run_until_complete(client_completed)
finally:
    log.info('closing event loop')
    event_loop.close()


# 普通运行方式2
# import socket
# with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
#     sock.connect(('127.0.0.1', 10030))
#     while 1:
#         data = b"test"
#         sock.send(data)
#         print(sock.recv(1024))

参考:
http://www.cnblogs.com/jokerbj/p/7422349.html

http://xiaorui.cc/2016/03/08/%E8%A7%A3%E5%86%B3golang%E5%BC%80%E5%8F%91socket%E6%9C%8D%E5%8A%A1%E6%97%B6%E7%B2%98%E5%8C%85%E5%8D%8A%E5%8C%85bug/

http://xiaorui.cc/2016/04/15/%E6%89%93%E9%80%A0mvc%E6%A1%86%E6%9E%B6%E4%B9%8Bsocket%E8%8E%B7%E5%8F%96http%E7%B2%98%E5%8C%85%E9%97%AE%E9%A2%98/

https://www.jianshu.com/p/065c53cab328

https://mozillazg.com/2017/08/python-asyncio-note-io-protocol.html#hidid3

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容