基于python自定义wsgi框架
import urllib.parse
import http.client
from wsgiref.headers import Headers
from wsgiref.simple_server import make_server
import re
class Request:
"""
实现对request的封装.
"""
def __init__(self, environ):
self.environ = environ
path_info = environ.get('PATH_INFO')
self.path = path_info
@property
def args(self):
"""把url地址传递过来的query转化成字典"""
# 拿到environ中的QUERY_STRING.然后利用urllib对其进行解析
# self.environ['QUERY_STRING'] name=egon&age=12&age=23
get_arguments = urllib.parse.parse_qs(self.environ['QUERY_STRING']) # {'name': ['egon'], 'age': ['12', '23']}
res = {k: v[0] for k, v in get_arguments.items()} # {'name': 'egon', 'age': '12'}
return res
class Response:
"""
实现对response的封装
"""
def __init__(self, response=None, status=200, charset='utf8', content_type='text/html'):
self.response = [] if response is None else response # 响应的内容
self.charset = charset
self.headers = Headers()
# 响应头
content_type = '{content_type};charset={charset}'.format(content_type=content_type, charset=charset)
self.headers.add_header('content-type', content_type)
self._status = status
@property
def status(self):
status_string = http.client.responses.get(self._status, 'UNKNOWN')
print(status_string) # OK
return '{status} {status_string}'.format(status=self._status, status_string=status_string)
def __iter__(self):
for val in self.response:
if isinstance(val, bytes):
yield val
else:
yield val.encode(self.charset)
# 开始写路由系统
class NotFoundError(Exception):
""" url pattern not found """
pass
class Router:
def __init__(self):
self.routing_table = [] # 保存url pattern 和可调用对象
def add_route(self, pattern, callback):
self.routing_table.append((pattern, callback))
def match(self, path):
for (pattern, callback) in self.routing_table:
m = re.match(pattern, path)
if m:
# print(callback, m.groups())
return callback, m.groups()
raise NotFoundError()
def __call__(self, pattern):
def inner(func):
self.routing_table.append((pattern, func))
return inner
routes = Router()
def hello(request, name):
return Response(f"<h1>Hello,{name}</h1>")
def goodbye(request, name):
return Response(f"<h1>Goodbye,{name}</h1>")
routes.add_route(r'/hello/(.*)', hello)
routes.add_route(r'/goodbye/(.*)$', goodbye)
@routes(r'/world/(.*)')
def world(request, name):
return Response(f"<h1>Call方法,{name}</h1>")
def request_response_application(func):
def inner(environ, start_response):
request = Request(environ)
response = func(request) # response是Response的实例
# start_response 有两个参数,一个是响应的状态,一个是响应头
# status = '200 OK'
# headers = [('Content-Type', 'text/html; charset=utf8')]
print(response.status, response.headers.items)
start_response(response.status, response.headers.items())
return iter(response) # 调用response的__iter__方法
return inner
@request_response_application
def application(request):
try:
callback, args = routes.match(request.path)
response = callback(request, *args)
except NotFoundError:
response = Response("<h1>未发现此页面</h1>", status=404)
return response
# name = request.args.get('name', 'default_name')
# return Response(['<h1>hello {name}</h1>'.format(name=name)])
if __name__ == '__main__':
httpd = make_server('127.0.0.1', 8999, application)
httpd.serve_forever()